res_stasis.c 52 KB


  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012 - 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis application support.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. *
  24. * <code>res_stasis.so</code> brings together the various components of the
  25. * Stasis application infrastructure.
  26. *
  27. * First, there's the Stasis application handler, stasis_app_exec(). This is
  28. * called by <code>app_stasis.so</code> to give control of a channel to the
  29. * Stasis application code from the dialplan.
  30. *
  31. * While a channel is in stasis_app_exec(), it has a \ref stasis_app_control
  32. * object, which may be used to control the channel.
  33. *
  34. * To control the channel, commands may be sent to channel using
  35. * stasis_app_send_command() and stasis_app_send_async_command().
  36. *
  37. * Alongside this, applications may be registered/unregistered using
  38. * stasis_app_register()/stasis_app_unregister(). While a channel is in Stasis,
  39. * events received on the channel's topic are converted to JSON and forwarded to
  40. * the \ref stasis_app_cb. The application may also subscribe to the channel to
  41. * continue to receive messages even after the channel has left Stasis, but it
  42. * will not be able to control it.
  43. *
  44. * Given all the stuff that comes together in this module, it's been broken up
  45. * into several pieces that are in <code>res/stasis/</code> and compiled into
  46. * <code>res_stasis.so</code>.
  47. */
  48. /*** MODULEINFO
  49. <support_level>core</support_level>
  50. ***/
  51. #include "asterisk.h"
  52. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  53. #include "asterisk/astobj2.h"
  54. #include "asterisk/callerid.h"
  55. #include "asterisk/module.h"
  56. #include "asterisk/stasis_app_impl.h"
  57. #include "asterisk/stasis_channels.h"
  58. #include "asterisk/stasis_bridges.h"
  59. #include "asterisk/stasis_endpoints.h"
  60. #include "asterisk/stasis_message_router.h"
  61. #include "asterisk/strings.h"
  62. #include "stasis/app.h"
  63. #include "stasis/control.h"
  64. #include "stasis/messaging.h"
  65. #include "stasis/stasis_bridge.h"
  66. #include "asterisk/core_unreal.h"
  67. #include "asterisk/musiconhold.h"
  68. #include "asterisk/causes.h"
  69. #include "asterisk/stringfields.h"
  70. #include "asterisk/bridge_after.h"
  71. #include "asterisk/format_cache.h"
  72. /*! Time to wait for a frame in the application */
  73. #define MAX_WAIT_MS 200
  74. /*!
  75. * \brief Number of buckets for the Stasis application hash table. Remember to
  76. * keep it a prime number!
  77. */
  78. #define APPS_NUM_BUCKETS 127
  79. /*!
  80. * \brief Number of buckets for the Stasis application hash table. Remember to
  81. * keep it a prime number!
  82. */
  83. #define CONTROLS_NUM_BUCKETS 127
  84. /*!
  85. * \brief Number of buckets for the Stasis bridges hash table. Remember to
  86. * keep it a prime number!
  87. */
  88. #define BRIDGES_NUM_BUCKETS 127
  89. /*!
  90. * \brief Stasis application container.
  91. */
  92. struct ao2_container *apps_registry;
  93. struct ao2_container *app_controls;
  94. struct ao2_container *app_bridges;
  95. struct ao2_container *app_bridges_moh;
  96. struct ao2_container *app_bridges_playback;
  97. static struct ast_json *stasis_end_json_payload(struct ast_channel_snapshot *snapshot,
  98. const struct stasis_message_sanitizer *sanitize)
  99. {
  100. return ast_json_pack("{s: s, s: o, s: o}",
  101. "type", "StasisEnd",
  102. "timestamp", ast_json_timeval(ast_tvnow(), NULL),
  103. "channel", ast_channel_snapshot_to_json(snapshot, sanitize));
  104. }
  105. static struct ast_json *stasis_end_to_json(struct stasis_message *message,
  106. const struct stasis_message_sanitizer *sanitize)
  107. {
  108. struct ast_channel_blob *payload = stasis_message_data(message);
  109. if (sanitize && sanitize->channel_snapshot &&
  110. sanitize->channel_snapshot(payload->snapshot)) {
  111. return NULL;
  112. }
  113. return stasis_end_json_payload(payload->snapshot, sanitize);
  114. }
  115. STASIS_MESSAGE_TYPE_DEFN(ast_stasis_end_message_type,
  116. .to_json = stasis_end_to_json);
  117. const char *stasis_app_name(const struct stasis_app *app)
  118. {
  119. return app_name(app);
  120. }
  121. /*! AO2 hash function for \ref app */
  122. static int app_hash(const void *obj, const int flags)
  123. {
  124. const struct stasis_app *app;
  125. const char *key;
  126. switch (flags & OBJ_SEARCH_MASK) {
  127. case OBJ_SEARCH_KEY:
  128. key = obj;
  129. break;
  130. case OBJ_SEARCH_OBJECT:
  131. app = obj;
  132. key = stasis_app_name(app);
  133. break;
  134. default:
  135. /* Hash can only work on something with a full key. */
  136. ast_assert(0);
  137. return 0;
  138. }
  139. return ast_str_hash(key);
  140. }
  141. /*! AO2 comparison function for \ref app */
  142. static int app_compare(void *obj, void *arg, int flags)
  143. {
  144. const struct stasis_app *object_left = obj;
  145. const struct stasis_app *object_right = arg;
  146. const char *right_key = arg;
  147. int cmp;
  148. switch (flags & OBJ_SEARCH_MASK) {
  149. case OBJ_SEARCH_OBJECT:
  150. right_key = stasis_app_name(object_right);
  151. /* Fall through */
  152. case OBJ_SEARCH_KEY:
  153. cmp = strcmp(stasis_app_name(object_left), right_key);
  154. break;
  155. case OBJ_SEARCH_PARTIAL_KEY:
  156. /*
  157. * We could also use a partial key struct containing a length
  158. * so strlen() does not get called for every comparison instead.
  159. */
  160. cmp = strncmp(stasis_app_name(object_left), right_key, strlen(right_key));
  161. break;
  162. default:
  163. /*
  164. * What arg points to is specific to this traversal callback
  165. * and has no special meaning to astobj2.
  166. */
  167. cmp = 0;
  168. break;
  169. }
  170. if (cmp) {
  171. return 0;
  172. }
  173. /*
  174. * At this point the traversal callback is identical to a sorted
  175. * container.
  176. */
  177. return CMP_MATCH;
  178. }
  179. /*! AO2 hash function for \ref stasis_app_control */
  180. static int control_hash(const void *obj, const int flags)
  181. {
  182. const struct stasis_app_control *control;
  183. const char *key;
  184. switch (flags & OBJ_SEARCH_MASK) {
  185. case OBJ_SEARCH_KEY:
  186. key = obj;
  187. break;
  188. case OBJ_SEARCH_OBJECT:
  189. control = obj;
  190. key = stasis_app_control_get_channel_id(control);
  191. break;
  192. default:
  193. /* Hash can only work on something with a full key. */
  194. ast_assert(0);
  195. return 0;
  196. }
  197. return ast_str_hash(key);
  198. }
  199. /*! AO2 comparison function for \ref stasis_app_control */
  200. static int control_compare(void *obj, void *arg, int flags)
  201. {
  202. const struct stasis_app_control *object_left = obj;
  203. const struct stasis_app_control *object_right = arg;
  204. const char *right_key = arg;
  205. int cmp;
  206. switch (flags & OBJ_SEARCH_MASK) {
  207. case OBJ_SEARCH_OBJECT:
  208. right_key = stasis_app_control_get_channel_id(object_right);
  209. /* Fall through */
  210. case OBJ_SEARCH_KEY:
  211. cmp = strcmp(stasis_app_control_get_channel_id(object_left), right_key);
  212. break;
  213. case OBJ_SEARCH_PARTIAL_KEY:
  214. /*
  215. * We could also use a partial key struct containing a length
  216. * so strlen() does not get called for every comparison instead.
  217. */
  218. cmp = strncmp(stasis_app_control_get_channel_id(object_left), right_key, strlen(right_key));
  219. break;
  220. default:
  221. /*
  222. * What arg points to is specific to this traversal callback
  223. * and has no special meaning to astobj2.
  224. */
  225. cmp = 0;
  226. break;
  227. }
  228. if (cmp) {
  229. return 0;
  230. }
  231. /*
  232. * At this point the traversal callback is identical to a sorted
  233. * container.
  234. */
  235. return CMP_MATCH;
  236. }
  237. static int cleanup_cb(void *obj, void *arg, int flags)
  238. {
  239. struct stasis_app *app = obj;
  240. if (!app_is_finished(app)) {
  241. return 0;
  242. }
  243. ast_verb(1, "Shutting down application '%s'\n", stasis_app_name(app));
  244. app_shutdown(app);
  245. return CMP_MATCH;
  246. }
  247. /*!
  248. * \brief Clean up any old apps that we don't need any more.
  249. */
  250. static void cleanup(void)
  251. {
  252. ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK,
  253. cleanup_cb, NULL);
  254. }
  255. struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan)
  256. {
  257. return control_create(chan, NULL);
  258. }
  259. struct stasis_app_control *stasis_app_control_find_by_channel(
  260. const struct ast_channel *chan)
  261. {
  262. if (chan == NULL) {
  263. return NULL;
  264. }
  265. return stasis_app_control_find_by_channel_id(
  266. ast_channel_uniqueid(chan));
  267. }
  268. struct stasis_app_control *stasis_app_control_find_by_channel_id(
  269. const char *channel_id)
  270. {
  271. return ao2_find(app_controls, channel_id, OBJ_SEARCH_KEY);
  272. }
  273. /*! AO2 hash function for bridges container */
  274. static int bridges_hash(const void *obj, const int flags)
  275. {
  276. const struct ast_bridge *bridge;
  277. const char *key;
  278. switch (flags & OBJ_SEARCH_MASK) {
  279. case OBJ_SEARCH_KEY:
  280. key = obj;
  281. break;
  282. case OBJ_SEARCH_OBJECT:
  283. bridge = obj;
  284. key = bridge->uniqueid;
  285. break;
  286. default:
  287. /* Hash can only work on something with a full key. */
  288. ast_assert(0);
  289. return 0;
  290. }
  291. return ast_str_hash(key);
  292. }
  293. /*! AO2 comparison function for bridges container */
  294. static int bridges_compare(void *obj, void *arg, int flags)
  295. {
  296. const struct ast_bridge *object_left = obj;
  297. const struct ast_bridge *object_right = arg;
  298. const char *right_key = arg;
  299. int cmp;
  300. switch (flags & OBJ_SEARCH_MASK) {
  301. case OBJ_SEARCH_OBJECT:
  302. right_key = object_right->uniqueid;
  303. /* Fall through */
  304. case OBJ_SEARCH_KEY:
  305. cmp = strcmp(object_left->uniqueid, right_key);
  306. break;
  307. case OBJ_SEARCH_PARTIAL_KEY:
  308. /*
  309. * We could also use a partial key struct containing a length
  310. * so strlen() does not get called for every comparison instead.
  311. */
  312. cmp = strncmp(object_left->uniqueid, right_key, strlen(right_key));
  313. break;
  314. default:
  315. /*
  316. * What arg points to is specific to this traversal callback
  317. * and has no special meaning to astobj2.
  318. */
  319. cmp = 0;
  320. break;
  321. }
  322. if (cmp) {
  323. return 0;
  324. }
  325. /*
  326. * At this point the traversal callback is identical to a sorted
  327. * container.
  328. */
  329. return CMP_MATCH;
  330. }
  331. /*!
  332. * Used with app_bridges_moh and app_bridge_control, they provide links
  333. * between bridges and channels used for ARI application purposes
  334. */
  335. struct stasis_app_bridge_channel_wrapper {
  336. AST_DECLARE_STRING_FIELDS(
  337. AST_STRING_FIELD(channel_id);
  338. AST_STRING_FIELD(bridge_id);
  339. );
  340. };
  341. static void stasis_app_bridge_channel_wrapper_destructor(void *obj)
  342. {
  343. struct stasis_app_bridge_channel_wrapper *wrapper = obj;
  344. ast_string_field_free_memory(wrapper);
  345. }
  346. /*! AO2 hash function for the bridges moh container */
  347. static int bridges_channel_hash_fn(const void *obj, const int flags)
  348. {
  349. const struct stasis_app_bridge_channel_wrapper *wrapper;
  350. const char *key;
  351. switch (flags & OBJ_SEARCH_MASK) {
  352. case OBJ_SEARCH_KEY:
  353. key = obj;
  354. break;
  355. case OBJ_SEARCH_OBJECT:
  356. wrapper = obj;
  357. key = wrapper->bridge_id;
  358. break;
  359. default:
  360. /* Hash can only work on something with a full key. */
  361. ast_assert(0);
  362. return 0;
  363. }
  364. return ast_str_hash(key);
  365. }
  366. static int bridges_channel_sort_fn(const void *obj_left, const void *obj_right, const int flags)
  367. {
  368. const struct stasis_app_bridge_channel_wrapper *left = obj_left;
  369. const struct stasis_app_bridge_channel_wrapper *right = obj_right;
  370. const char *right_key = obj_right;
  371. int cmp;
  372. switch (flags & OBJ_SEARCH_MASK) {
  373. case OBJ_SEARCH_OBJECT:
  374. right_key = right->bridge_id;
  375. /* Fall through */
  376. case OBJ_SEARCH_KEY:
  377. cmp = strcmp(left->bridge_id, right_key);
  378. break;
  379. case OBJ_SEARCH_PARTIAL_KEY:
  380. cmp = strncmp(left->bridge_id, right_key, strlen(right_key));
  381. break;
  382. default:
  383. /* Sort can only work on something with a full or partial key. */
  384. ast_assert(0);
  385. cmp = 0;
  386. break;
  387. }
  388. return cmp;
  389. }
  390. /*! Removes the bridge to music on hold channel link */
  391. static void remove_bridge_moh(char *bridge_id)
  392. {
  393. ao2_find(app_bridges_moh, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
  394. ast_free(bridge_id);
  395. }
  396. /*! After bridge failure callback for moh channels */
  397. static void moh_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
  398. {
  399. char *bridge_id = data;
  400. remove_bridge_moh(bridge_id);
  401. }
  402. /*! After bridge callback for moh channels */
  403. static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
  404. {
  405. char *bridge_id = data;
  406. remove_bridge_moh(bridge_id);
  407. }
  408. /*! Request a bridge MOH channel */
  409. static struct ast_channel *prepare_bridge_moh_channel(void)
  410. {
  411. RAII_VAR(struct ast_format_cap *, cap, NULL, ao2_cleanup);
  412. cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
  413. if (!cap) {
  414. return NULL;
  415. }
  416. ast_format_cap_append(cap, ast_format_slin, 0);
  417. return ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
  418. }
  419. /*! Provides the moh channel with a thread so it can actually play its music */
  420. static void *moh_channel_thread(void *data)
  421. {
  422. struct ast_channel *moh_channel = data;
  423. while (!ast_safe_sleep(moh_channel, 1000)) {
  424. }
  425. ast_moh_stop(moh_channel);
  426. ast_hangup(moh_channel);
  427. return NULL;
  428. }
  429. /*!
  430. * \internal
  431. * \brief Creates, pushes, and links a channel for playing music on hold to bridge
  432. *
  433. * \param bridge Which bridge this moh channel exists for
  434. *
  435. * \retval NULL if the channel could not be created, pushed, or linked
  436. * \retval Reference to the channel on success
  437. */
  438. static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
  439. {
  440. RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
  441. RAII_VAR(char *, bridge_id, ast_strdup(bridge->uniqueid), ast_free);
  442. struct ast_channel *chan;
  443. pthread_t threadid;
  444. if (!bridge_id) {
  445. return NULL;
  446. }
  447. chan = prepare_bridge_moh_channel();
  448. if (!chan) {
  449. return NULL;
  450. }
  451. if (stasis_app_channel_unreal_set_internal(chan)) {
  452. ast_hangup(chan);
  453. return NULL;
  454. }
  455. /* The after bridge callback assumes responsibility of the bridge_id. */
  456. if (ast_bridge_set_after_callback(chan,
  457. moh_after_bridge_cb, moh_after_bridge_cb_failed, bridge_id)) {
  458. ast_hangup(chan);
  459. return NULL;
  460. }
  461. bridge_id = NULL;
  462. if (ast_unreal_channel_push_to_bridge(chan, bridge,
  463. AST_BRIDGE_CHANNEL_FLAG_IMMOVABLE | AST_BRIDGE_CHANNEL_FLAG_LONELY)) {
  464. ast_hangup(chan);
  465. return NULL;
  466. }
  467. new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
  468. stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  469. if (!new_wrapper) {
  470. ast_hangup(chan);
  471. return NULL;
  472. }
  473. if (ast_string_field_init(new_wrapper, 32)) {
  474. ast_hangup(chan);
  475. return NULL;
  476. }
  477. ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
  478. ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
  479. if (!ao2_link_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK)) {
  480. ast_hangup(chan);
  481. return NULL;
  482. }
  483. if (ast_pthread_create_detached(&threadid, NULL, moh_channel_thread, chan)) {
  484. ast_log(LOG_ERROR, "Failed to create channel thread. Abandoning MOH channel creation.\n");
  485. ao2_unlink_flags(app_bridges_moh, new_wrapper, OBJ_NOLOCK);
  486. ast_hangup(chan);
  487. return NULL;
  488. }
  489. return chan;
  490. }
  491. struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
  492. {
  493. RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
  494. {
  495. SCOPED_AO2LOCK(lock, app_bridges_moh);
  496. moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  497. if (!moh_wrapper) {
  498. return bridge_moh_create(bridge);
  499. }
  500. }
  501. return ast_channel_get_by_name(moh_wrapper->channel_id);
  502. }
  503. int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
  504. {
  505. RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup);
  506. struct ast_channel *chan;
  507. moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
  508. if (!moh_wrapper) {
  509. return -1;
  510. }
  511. chan = ast_channel_get_by_name(moh_wrapper->channel_id);
  512. if (!chan) {
  513. return -1;
  514. }
  515. ast_moh_stop(chan);
  516. ast_softhangup(chan, AST_CAUSE_NORMAL_CLEARING);
  517. ao2_cleanup(chan);
  518. return 0;
  519. }
  520. /*! Removes the bridge to playback channel link */
  521. static void remove_bridge_playback(char *bridge_id)
  522. {
  523. struct stasis_app_bridge_channel_wrapper *wrapper;
  524. struct stasis_app_control *control;
  525. wrapper = ao2_find(app_bridges_playback, bridge_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
  526. if (wrapper) {
  527. control = stasis_app_control_find_by_channel_id(wrapper->channel_id);
  528. if (control) {
  529. ao2_unlink(app_controls, control);
  530. ao2_ref(control, -1);
  531. }
  532. ao2_ref(wrapper, -1);
  533. }
  534. ast_free(bridge_id);
  535. }
  536. static void playback_after_bridge_cb_failed(enum ast_bridge_after_cb_reason reason, void *data)
  537. {
  538. char *bridge_id = data;
  539. remove_bridge_playback(bridge_id);
  540. }
  541. static void playback_after_bridge_cb(struct ast_channel *chan, void *data)
  542. {
  543. char *bridge_id = data;
  544. remove_bridge_playback(bridge_id);
  545. }
  546. int stasis_app_bridge_playback_channel_add(struct ast_bridge *bridge,
  547. struct ast_channel *chan,
  548. struct stasis_app_control *control)
  549. {
  550. RAII_VAR(struct stasis_app_bridge_channel_wrapper *, new_wrapper, NULL, ao2_cleanup);
  551. char *bridge_id = ast_strdup(bridge->uniqueid);
  552. if (!bridge_id) {
  553. return -1;
  554. }
  555. if (ast_bridge_set_after_callback(chan,
  556. playback_after_bridge_cb, playback_after_bridge_cb_failed, bridge_id)) {
  557. ast_free(bridge_id);
  558. return -1;
  559. }
  560. new_wrapper = ao2_alloc_options(sizeof(*new_wrapper),
  561. stasis_app_bridge_channel_wrapper_destructor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  562. if (!new_wrapper) {
  563. return -1;
  564. }
  565. if (ast_string_field_init(new_wrapper, 32)) {
  566. return -1;
  567. }
  568. ast_string_field_set(new_wrapper, bridge_id, bridge->uniqueid);
  569. ast_string_field_set(new_wrapper, channel_id, ast_channel_uniqueid(chan));
  570. if (!ao2_link(app_bridges_playback, new_wrapper)) {
  571. return -1;
  572. }
  573. ao2_link(app_controls, control);
  574. return 0;
  575. }
  576. struct ast_channel *stasis_app_bridge_playback_channel_find(struct ast_bridge *bridge)
  577. {
  578. struct stasis_app_bridge_channel_wrapper *playback_wrapper;
  579. struct ast_channel *chan;
  580. playback_wrapper = ao2_find(app_bridges_playback, bridge->uniqueid, OBJ_SEARCH_KEY);
  581. if (!playback_wrapper) {
  582. return NULL;
  583. }
  584. chan = ast_channel_get_by_name(playback_wrapper->channel_id);
  585. ao2_ref(playback_wrapper, -1);
  586. return chan;
  587. }
  588. struct ast_bridge *stasis_app_bridge_find_by_id(
  589. const char *bridge_id)
  590. {
  591. return ao2_find(app_bridges, bridge_id, OBJ_SEARCH_KEY);
  592. }
  593. /*!
  594. * \brief In addition to running ao2_cleanup(), this function also removes the
  595. * object from the app_controls container.
  596. */
  597. static void control_unlink(struct stasis_app_control *control)
  598. {
  599. if (!control) {
  600. return;
  601. }
  602. ao2_unlink(app_controls, control);
  603. ao2_cleanup(control);
  604. }
  605. struct ast_bridge *stasis_app_bridge_create(const char *type, const char *name, const char *id)
  606. {
  607. struct ast_bridge *bridge;
  608. char *requested_type, *requested_types = ast_strdupa(S_OR(type, "mixing"));
  609. int capabilities = 0;
  610. int flags = AST_BRIDGE_FLAG_MERGE_INHIBIT_FROM | AST_BRIDGE_FLAG_MERGE_INHIBIT_TO
  611. | AST_BRIDGE_FLAG_SWAP_INHIBIT_FROM | AST_BRIDGE_FLAG_SWAP_INHIBIT_TO
  612. | AST_BRIDGE_FLAG_TRANSFER_BRIDGE_ONLY;
  613. while ((requested_type = strsep(&requested_types, ","))) {
  614. requested_type = ast_strip(requested_type);
  615. if (!strcmp(requested_type, "mixing")) {
  616. capabilities |= STASIS_BRIDGE_MIXING_CAPABILITIES;
  617. flags |= AST_BRIDGE_FLAG_SMART;
  618. } else if (!strcmp(requested_type, "holding")) {
  619. capabilities |= AST_BRIDGE_CAPABILITY_HOLDING;
  620. } else if (!strcmp(requested_type, "dtmf_events") ||
  621. !strcmp(requested_type, "proxy_media")) {
  622. capabilities &= ~AST_BRIDGE_CAPABILITY_NATIVE;
  623. }
  624. }
  625. if (!capabilities
  626. /* Holding and mixing capabilities don't mix. */
  627. || ((capabilities & AST_BRIDGE_CAPABILITY_HOLDING)
  628. && (capabilities & (STASIS_BRIDGE_MIXING_CAPABILITIES)))) {
  629. return NULL;
  630. }
  631. bridge = bridge_stasis_new(capabilities, flags, name, id);
  632. if (bridge) {
  633. if (!ao2_link(app_bridges, bridge)) {
  634. ast_bridge_destroy(bridge, 0);
  635. bridge = NULL;
  636. }
  637. }
  638. return bridge;
  639. }
  640. void stasis_app_bridge_destroy(const char *bridge_id)
  641. {
  642. struct ast_bridge *bridge = stasis_app_bridge_find_by_id(bridge_id);
  643. if (!bridge) {
  644. return;
  645. }
  646. ao2_unlink(app_bridges, bridge);
  647. ast_bridge_destroy(bridge, 0);
  648. }
  649. struct replace_channel_store {
  650. struct ast_channel_snapshot *snapshot;
  651. char *app;
  652. };
  653. static void replace_channel_destroy(void *obj)
  654. {
  655. struct replace_channel_store *replace = obj;
  656. ao2_cleanup(replace->snapshot);
  657. ast_free(replace->app);
  658. ast_free(replace);
  659. }
  660. static const struct ast_datastore_info replace_channel_store_info = {
  661. .type = "replace-channel-store",
  662. .destroy = replace_channel_destroy,
  663. };
  664. static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
  665. {
  666. struct ast_datastore *datastore;
  667. SCOPED_CHANNELLOCK(lock, chan);
  668. datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
  669. if (!datastore) {
  670. if (no_create) {
  671. return NULL;
  672. }
  673. datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
  674. if (!datastore) {
  675. return NULL;
  676. }
  677. ast_channel_datastore_add(chan, datastore);
  678. }
  679. if (!datastore->data) {
  680. datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
  681. }
  682. return datastore->data;
  683. }
  684. int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
  685. {
  686. struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
  687. if (!replace) {
  688. return -1;
  689. }
  690. ao2_replace(replace->snapshot, replace_snapshot);
  691. return 0;
  692. }
  693. int app_set_replace_channel_app(struct ast_channel *chan, const char *replace_app)
  694. {
  695. struct replace_channel_store *replace = get_replace_channel_store(chan, 0);
  696. if (!replace) {
  697. return -1;
  698. }
  699. ast_free(replace->app);
  700. replace->app = NULL;
  701. if (replace_app) {
  702. replace->app = ast_strdup(replace_app);
  703. if (!replace->app) {
  704. return -1;
  705. }
  706. }
  707. return 0;
  708. }
  709. static struct ast_channel_snapshot *get_replace_channel_snapshot(struct ast_channel *chan)
  710. {
  711. struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
  712. struct ast_channel_snapshot *replace_channel_snapshot;
  713. if (!replace) {
  714. return NULL;
  715. }
  716. replace_channel_snapshot = replace->snapshot;
  717. replace->snapshot = NULL;
  718. return replace_channel_snapshot;
  719. }
  720. char *app_get_replace_channel_app(struct ast_channel *chan)
  721. {
  722. struct replace_channel_store *replace = get_replace_channel_store(chan, 1);
  723. char *replace_channel_app;
  724. if (!replace) {
  725. return NULL;
  726. }
  727. replace_channel_app = replace->app;
  728. replace->app = NULL;
  729. return replace_channel_app;
  730. }
  731. static int send_start_msg_snapshots(struct stasis_app *app,
  732. int argc, char *argv[], struct ast_channel_snapshot *snapshot,
  733. struct ast_channel_snapshot *replace_channel_snapshot)
  734. {
  735. RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
  736. struct ast_json *json_args;
  737. struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
  738. int i;
  739. if (sanitize && sanitize->channel_snapshot
  740. && sanitize->channel_snapshot(snapshot)) {
  741. return 0;
  742. }
  743. msg = ast_json_pack("{s: s, s: o, s: [], s: o}",
  744. "type", "StasisStart",
  745. "timestamp", ast_json_timeval(ast_tvnow(), NULL),
  746. "args",
  747. "channel", ast_channel_snapshot_to_json(snapshot, NULL));
  748. if (!msg) {
  749. return -1;
  750. }
  751. if (replace_channel_snapshot) {
  752. int res = ast_json_object_set(msg, "replace_channel",
  753. ast_channel_snapshot_to_json(replace_channel_snapshot, NULL));
  754. if (res) {
  755. return -1;
  756. }
  757. }
  758. /* Append arguments to args array */
  759. json_args = ast_json_object_get(msg, "args");
  760. ast_assert(json_args != NULL);
  761. for (i = 0; i < argc; ++i) {
  762. int r = ast_json_array_append(json_args,
  763. ast_json_string_create(argv[i]));
  764. if (r != 0) {
  765. ast_log(LOG_ERROR, "Error appending start message\n");
  766. return -1;
  767. }
  768. }
  769. app_send(app, msg);
  770. return 0;
  771. }
  772. static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
  773. int argc, char *argv[])
  774. {
  775. RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
  776. RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot,
  777. NULL, ao2_cleanup);
  778. ast_assert(chan != NULL);
  779. replace_channel_snapshot = get_replace_channel_snapshot(chan);
  780. /* Set channel info */
  781. ast_channel_lock(chan);
  782. snapshot = ast_channel_snapshot_create(chan);
  783. ast_channel_unlock(chan);
  784. if (!snapshot) {
  785. return -1;
  786. }
  787. return send_start_msg_snapshots(app, argc, argv, snapshot, replace_channel_snapshot);
  788. }
  789. static int send_end_msg_snapshot(struct stasis_app *app, struct ast_channel_snapshot *snapshot)
  790. {
  791. struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
  792. struct ast_json *msg;
  793. if (sanitize && sanitize->channel_snapshot
  794. && sanitize->channel_snapshot(snapshot)) {
  795. return 0;
  796. }
  797. msg = stasis_end_json_payload(snapshot, sanitize);
  798. if (!msg) {
  799. return -1;
  800. }
  801. app_send(app, msg);
  802. ast_json_unref(msg);
  803. return 0;
  804. }
  805. static void remove_masquerade_store(struct ast_channel *chan);
  806. static int masq_match_cb(void *obj, void *data, int flags)
  807. {
  808. struct stasis_app_control *control = obj;
  809. struct ast_channel *chan = data;
  810. if (!strcmp(ast_channel_uniqueid(chan),
  811. stasis_app_control_get_channel_id(control))) {
  812. return CMP_MATCH;
  813. }
  814. return 0;
  815. }
  816. static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
  817. {
  818. struct ast_channel_snapshot *snapshot;
  819. struct stasis_app_control *control;
  820. /* grab a snapshot */
  821. snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
  822. if (!snapshot) {
  823. ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
  824. return;
  825. }
  826. /* find control */
  827. control = ao2_callback(app_controls, 0, masq_match_cb, old_chan);
  828. if (!control) {
  829. ast_log(LOG_ERROR, "Could not find control for masqueraded channel\n");
  830. ao2_cleanup(snapshot);
  831. return;
  832. }
  833. /* send the StasisEnd message to the app */
  834. send_end_msg_snapshot(control_app(control), snapshot);
  835. /* remove the datastore */
  836. remove_masquerade_store(old_chan);
  837. ao2_cleanup(control);
  838. ao2_cleanup(snapshot);
  839. }
  840. static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
  841. {
  842. RAII_VAR(struct ast_channel_snapshot *, new_snapshot, NULL, ao2_cleanup);
  843. RAII_VAR(struct ast_channel_snapshot *, old_snapshot, NULL, ao2_cleanup);
  844. struct stasis_app_control *control;
  845. /* At this point, new_chan is the channel pointer that is in Stasis() and
  846. * has the unknown channel's name in it while old_chan is the channel pointer
  847. * that is not in Stasis(), but has the guts of the channel that Stasis() knows
  848. * about */
  849. /* grab a snapshot for the channel that is jumping into Stasis() */
  850. new_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(new_chan));
  851. if (!new_snapshot) {
  852. ast_log(LOG_ERROR, "Could not get snapshot for masquerading channel\n");
  853. return;
  854. }
  855. /* grab a snapshot for the channel that has been kicked out of Stasis() */
  856. old_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(old_chan));
  857. if (!old_snapshot) {
  858. ast_log(LOG_ERROR, "Could not get snapshot for masqueraded channel\n");
  859. return;
  860. }
  861. /* find, unlink, and relink control since the channel has a new name and
  862. * its hash has likely changed */
  863. control = ao2_callback(app_controls, OBJ_UNLINK, masq_match_cb, new_chan);
  864. if (!control) {
  865. ast_log(LOG_ERROR, "Could not find control for masquerading channel\n");
  866. return;
  867. }
  868. ao2_link(app_controls, control);
  869. /* send the StasisStart with replace_channel to the app */
  870. send_start_msg_snapshots(control_app(control), 0, NULL, new_snapshot,
  871. old_snapshot);
  872. /* send the StasisEnd message to the app */
  873. send_end_msg_snapshot(control_app(control), old_snapshot);
  874. /* fixup channel topic forwards */
  875. if (app_replace_channel_forwards(control_app(control), old_snapshot->uniqueid, new_chan)) {
  876. ast_log(LOG_ERROR, "Failed to fixup channel topic forwards for %s(%s) owned by %s\n",
  877. old_snapshot->name, old_snapshot->uniqueid, app_name(control_app(control)));
  878. }
  879. ao2_cleanup(control);
  880. }
  881. static const struct ast_datastore_info masquerade_store_info = {
  882. .type = "stasis-masqerade",
  883. .chan_fixup = channel_stolen_cb,
  884. .chan_breakdown = channel_replaced_cb,
  885. };
  886. static int has_masquerade_store(struct ast_channel *chan)
  887. {
  888. SCOPED_CHANNELLOCK(lock, chan);
  889. return !!ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
  890. }
  891. static int add_masquerade_store(struct ast_channel *chan)
  892. {
  893. struct ast_datastore *datastore;
  894. SCOPED_CHANNELLOCK(lock, chan);
  895. if (ast_channel_datastore_find(chan, &masquerade_store_info, NULL)) {
  896. return 0;
  897. }
  898. datastore = ast_datastore_alloc(&masquerade_store_info, NULL);
  899. if (!datastore) {
  900. return -1;
  901. }
  902. ast_channel_datastore_add(chan, datastore);
  903. return 0;
  904. }
  905. static void remove_masquerade_store(struct ast_channel *chan)
  906. {
  907. struct ast_datastore *datastore;
  908. SCOPED_CHANNELLOCK(lock, chan);
  909. datastore = ast_channel_datastore_find(chan, &masquerade_store_info, NULL);
  910. if (!datastore) {
  911. return;
  912. }
  913. ast_channel_datastore_remove(chan, datastore);
  914. ast_datastore_free(datastore);
  915. }
  916. static int send_end_msg(struct stasis_app *app, struct ast_channel *chan)
  917. {
  918. struct ast_channel_snapshot *snapshot;
  919. int res = 0;
  920. ast_assert(chan != NULL);
  921. /* A masquerade has occurred and this message will be wrong so it
  922. * has already been sent elsewhere. */
  923. if (!has_masquerade_store(chan)) {
  924. return 0;
  925. }
  926. /* Set channel info */
  927. snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan));
  928. if (!snapshot) {
  929. return -1;
  930. }
  931. if (send_end_msg_snapshot(app, snapshot)) {
  932. res = -1;
  933. }
  934. ao2_cleanup(snapshot);
  935. return res;
  936. }
  937. void stasis_app_control_execute_until_exhausted(struct ast_channel *chan, struct stasis_app_control *control)
  938. {
  939. while (!control_is_done(control)) {
  940. int command_count;
  941. command_count = control_dispatch_all(control, chan);
  942. ao2_lock(control);
  943. if (control_command_count(control)) {
  944. /* If the command queue isn't empty, something added to the queue before it was locked. */
  945. ao2_unlock(control);
  946. continue;
  947. }
  948. if (command_count == 0 || ast_channel_fdno(chan) == -1) {
  949. control_mark_done(control);
  950. ao2_unlock(control);
  951. break;
  952. }
  953. ao2_unlock(control);
  954. }
  955. }
  956. int stasis_app_control_is_done(struct stasis_app_control *control)
  957. {
  958. return control_is_done(control);
  959. }
  960. struct ast_datastore_info set_end_published_info = {
  961. .type = "stasis_end_published",
  962. };
  963. void stasis_app_channel_set_stasis_end_published(struct ast_channel *chan)
  964. {
  965. struct ast_datastore *datastore;
  966. datastore = ast_datastore_alloc(&set_end_published_info, NULL);
  967. ast_channel_lock(chan);
  968. ast_channel_datastore_add(chan, datastore);
  969. ast_channel_unlock(chan);
  970. }
  971. int stasis_app_channel_is_stasis_end_published(struct ast_channel *chan)
  972. {
  973. struct ast_datastore *datastore;
  974. ast_channel_lock(chan);
  975. datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
  976. ast_channel_unlock(chan);
  977. return datastore ? 1 : 0;
  978. }
  979. static void remove_stasis_end_published(struct ast_channel *chan)
  980. {
  981. struct ast_datastore *datastore;
  982. ast_channel_lock(chan);
  983. datastore = ast_channel_datastore_find(chan, &set_end_published_info, NULL);
  984. ast_channel_unlock(chan);
  985. if (datastore) {
  986. ast_channel_datastore_remove(chan, datastore);
  987. ast_datastore_free(datastore);
  988. }
  989. }
  990. /*! /brief Stasis dialplan application callback */
  991. int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
  992. char *argv[])
  993. {
  994. SCOPED_MODULE_USE(ast_module_info->self);
  995. RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
  996. RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
  997. struct ast_bridge *bridge = NULL;
  998. int res = 0;
  999. int needs_depart;
  1000. ast_assert(chan != NULL);
  1001. /* Just in case there's a lingering indication that the channel has had a stasis
  1002. * end published on it, remove that now.
  1003. */
  1004. remove_stasis_end_published(chan);
  1005. app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
  1006. if (!app) {
  1007. ast_log(LOG_ERROR,
  1008. "Stasis app '%s' not registered\n", app_name);
  1009. return -1;
  1010. }
  1011. if (!app_is_active(app)) {
  1012. ast_log(LOG_ERROR,
  1013. "Stasis app '%s' not active\n", app_name);
  1014. return -1;
  1015. }
  1016. control = control_create(chan, app);
  1017. if (!control) {
  1018. ast_log(LOG_ERROR, "Allocated failed\n");
  1019. return -1;
  1020. }
  1021. ao2_link(app_controls, control);
  1022. if (add_masquerade_store(chan)) {
  1023. ast_log(LOG_ERROR, "Failed to attach masquerade detector\n");
  1024. return -1;
  1025. }
  1026. res = send_start_msg(app, chan, argc, argv);
  1027. if (res != 0) {
  1028. ast_log(LOG_ERROR,
  1029. "Error sending start message to '%s'\n", app_name);
  1030. remove_masquerade_store(chan);
  1031. return -1;
  1032. }
  1033. res = app_subscribe_channel(app, chan);
  1034. if (res != 0) {
  1035. ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
  1036. app_name, ast_channel_name(chan));
  1037. remove_masquerade_store(chan);
  1038. return -1;
  1039. }
  1040. /* Pull queued prestart commands and execute */
  1041. control_prestart_dispatch_all(control, chan);
  1042. while (!control_is_done(control)) {
  1043. RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
  1044. int r;
  1045. int command_count;
  1046. RAII_VAR(struct ast_bridge *, last_bridge, NULL, ao2_cleanup);
  1047. /* Check to see if a bridge absorbed our hangup frame */
  1048. if (ast_check_hangup_locked(chan)) {
  1049. break;
  1050. }
  1051. last_bridge = bridge;
  1052. bridge = ao2_bump(stasis_app_get_bridge(control));
  1053. if (bridge != last_bridge) {
  1054. app_unsubscribe_bridge(app, last_bridge);
  1055. app_subscribe_bridge(app, bridge);
  1056. }
  1057. if (bridge) {
  1058. /* Bridge is handling channel frames */
  1059. control_wait(control);
  1060. control_dispatch_all(control, chan);
  1061. continue;
  1062. }
  1063. r = ast_waitfor(chan, MAX_WAIT_MS);
  1064. if (r < 0) {
  1065. ast_debug(3, "%s: Poll error\n",
  1066. ast_channel_uniqueid(chan));
  1067. break;
  1068. }
  1069. command_count = control_dispatch_all(control, chan);
  1070. if (command_count > 0 && ast_channel_fdno(chan) == -1) {
  1071. /* Command drained the channel; wait for next frame */
  1072. continue;
  1073. }
  1074. if (r == 0) {
  1075. /* Timeout */
  1076. continue;
  1077. }
  1078. f = ast_read(chan);
  1079. if (!f) {
  1080. /* Continue on in the dialplan */
  1081. ast_debug(3, "%s: Hangup (no more frames)\n",
  1082. ast_channel_uniqueid(chan));
  1083. break;
  1084. }
  1085. if (f->frametype == AST_FRAME_CONTROL) {
  1086. if (f->subclass.integer == AST_CONTROL_HANGUP) {
  1087. /* Continue on in the dialplan */
  1088. ast_debug(3, "%s: Hangup\n",
  1089. ast_channel_uniqueid(chan));
  1090. break;
  1091. }
  1092. }
  1093. }
  1094. ast_channel_lock(chan);
  1095. needs_depart = ast_channel_is_bridged(chan);
  1096. ast_channel_unlock(chan);
  1097. if (needs_depart) {
  1098. ast_bridge_depart(chan);
  1099. }
  1100. app_unsubscribe_bridge(app, stasis_app_get_bridge(control));
  1101. ao2_cleanup(bridge);
  1102. /* Only publish a stasis_end event if it hasn't already been published */
  1103. if (!stasis_app_channel_is_stasis_end_published(chan)) {
  1104. app_unsubscribe_channel(app, chan);
  1105. res = send_end_msg(app, chan);
  1106. remove_masquerade_store(chan);
  1107. if (res != 0) {
  1108. ast_log(LOG_ERROR,
  1109. "Error sending end message to %s\n", app_name);
  1110. return res;
  1111. }
  1112. } else {
  1113. remove_stasis_end_published(chan);
  1114. }
  1115. /* There's an off chance that app is ready for cleanup. Go ahead
  1116. * and clean up, just in case
  1117. */
  1118. cleanup();
  1119. /* The control needs to be removed from the controls container in
  1120. * case a new PBX is started and ends up coming back into Stasis.
  1121. */
  1122. ao2_cleanup(app);
  1123. app = NULL;
  1124. control_unlink(control);
  1125. control = NULL;
  1126. if (!ast_check_hangup_locked(chan) && !ast_channel_pbx(chan)) {
  1127. struct ast_pbx_args pbx_args;
  1128. memset(&pbx_args, 0, sizeof(pbx_args));
  1129. pbx_args.no_hangup_chan = 1;
  1130. res = ast_pbx_run_args(chan, &pbx_args);
  1131. }
  1132. return res;
  1133. }
  1134. int stasis_app_send(const char *app_name, struct ast_json *message)
  1135. {
  1136. RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
  1137. app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
  1138. if (!app) {
  1139. /* XXX We can do a better job handling late binding, queueing up
  1140. * the call for a few seconds to wait for the app to register.
  1141. */
  1142. ast_log(LOG_WARNING,
  1143. "Stasis app '%s' not registered\n", app_name);
  1144. return -1;
  1145. }
  1146. app_send(app, message);
  1147. return 0;
  1148. }
  1149. static struct stasis_app *find_app_by_name(const char *app_name)
  1150. {
  1151. struct stasis_app *res = NULL;
  1152. if (!ast_strlen_zero(app_name)) {
  1153. res = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
  1154. }
  1155. if (!res) {
  1156. ast_log(LOG_WARNING, "Could not find app '%s'\n",
  1157. app_name ? : "(null)");
  1158. }
  1159. return res;
  1160. }
  1161. static int append_name(void *obj, void *arg, int flags)
  1162. {
  1163. struct stasis_app *app = obj;
  1164. struct ao2_container *apps = arg;
  1165. ast_str_container_add(apps, stasis_app_name(app));
  1166. return 0;
  1167. }
  1168. struct ao2_container *stasis_app_get_all(void)
  1169. {
  1170. RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
  1171. apps = ast_str_container_alloc(1);
  1172. if (!apps) {
  1173. return NULL;
  1174. }
  1175. ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
  1176. return ao2_bump(apps);
  1177. }
  1178. int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
  1179. {
  1180. RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
  1181. SCOPED_LOCK(apps_lock, apps_registry, ao2_lock, ao2_unlock);
  1182. app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1183. if (app) {
  1184. app_update(app, handler, data);
  1185. } else {
  1186. app = app_create(app_name, handler, data);
  1187. if (app) {
  1188. ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
  1189. } else {
  1190. return -1;
  1191. }
  1192. }
  1193. /* We lazily clean up the apps_registry, because it's good enough to
  1194. * prevent memory leaks, and we're lazy.
  1195. */
  1196. cleanup();
  1197. return 0;
  1198. }
  1199. void stasis_app_unregister(const char *app_name)
  1200. {
  1201. RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
  1202. if (!app_name) {
  1203. return;
  1204. }
  1205. app = ao2_find(apps_registry, app_name, OBJ_SEARCH_KEY);
  1206. if (!app) {
  1207. ast_log(LOG_ERROR,
  1208. "Stasis app '%s' not registered\n", app_name);
  1209. return;
  1210. }
  1211. app_deactivate(app);
  1212. /* There's a decent chance that app is ready for cleanup. Go ahead
  1213. * and clean up, just in case
  1214. */
  1215. cleanup();
  1216. }
  1217. /*!
  1218. * \internal \brief List of registered event sources.
  1219. */
  1220. AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
  1221. void stasis_app_register_event_source(struct stasis_app_event_source *obj)
  1222. {
  1223. SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
  1224. AST_LIST_INSERT_TAIL(&event_sources, obj, next);
  1225. /* only need to bump the module ref on non-core sources because the
  1226. core ones are [un]registered by this module. */
  1227. if (!stasis_app_is_core_event_source(obj)) {
  1228. ast_module_ref(ast_module_info->self);
  1229. }
  1230. }
  1231. void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
  1232. {
  1233. struct stasis_app_event_source *source;
  1234. SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
  1235. AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
  1236. if (source == obj) {
  1237. AST_RWLIST_REMOVE_CURRENT(next);
  1238. if (!stasis_app_is_core_event_source(obj)) {
  1239. ast_module_unref(ast_module_info->self);
  1240. }
  1241. break;
  1242. }
  1243. }
  1244. AST_RWLIST_TRAVERSE_SAFE_END;
  1245. }
  1246. /*!
  1247. * \internal
  1248. * \brief Convert event source data to JSON.
  1249. *
  1250. * Calls each event source that has a "to_json" handler allowing each
  1251. * source to add data to the given JSON object.
  1252. *
  1253. * \param app application associated with the event source
  1254. * \param json a json object to "fill"
  1255. *
  1256. * \retval The given json object.
  1257. */
  1258. static struct ast_json *app_event_sources_to_json(
  1259. const struct stasis_app *app, struct ast_json *json)
  1260. {
  1261. struct stasis_app_event_source *source;
  1262. SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
  1263. AST_LIST_TRAVERSE(&event_sources, source, next) {
  1264. if (source->to_json) {
  1265. source->to_json(app, json);
  1266. }
  1267. }
  1268. return json;
  1269. }
  1270. static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
  1271. {
  1272. if (!app) {
  1273. return NULL;
  1274. }
  1275. return app_event_sources_to_json(app, app_to_json(app));
  1276. }
  1277. struct ast_json *stasis_app_to_json(const char *app_name)
  1278. {
  1279. RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
  1280. return stasis_app_object_to_json(app);
  1281. }
  1282. /*!
  1283. * \internal
  1284. * \brief Finds an event source that matches a uri scheme.
  1285. *
  1286. * Uri(s) should begin with a particular scheme that can be matched
  1287. * against an event source.
  1288. *
  1289. * \param uri uri containing a scheme to match
  1290. *
  1291. * \retval an event source if found, NULL otherwise.
  1292. */
  1293. static struct stasis_app_event_source *app_event_source_find(const char *uri)
  1294. {
  1295. struct stasis_app_event_source *source;
  1296. SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
  1297. AST_LIST_TRAVERSE(&event_sources, source, next) {
  1298. if (ast_begins_with(uri, source->scheme)) {
  1299. return source;
  1300. }
  1301. }
  1302. return NULL;
  1303. }
  1304. /*!
  1305. * \internal
  1306. * \brief Callback for subscription handling
  1307. *
  1308. * \param app [un]subscribing application
  1309. * \param uri scheme:id of an event source
  1310. * \param event_source being [un]subscribed [from]to
  1311. *
  1312. * \retval stasis_app_subscribe_res return code.
  1313. */
  1314. typedef enum stasis_app_subscribe_res (*app_subscription_handler)(
  1315. struct stasis_app *app, const char *uri,
  1316. struct stasis_app_event_source *event_source);
  1317. /*!
  1318. * \internal
  1319. * \brief Subscriptions handler for application [un]subscribing.
  1320. *
  1321. * \param app_name Name of the application to subscribe.
  1322. * \param event_source_uris URIs for the event sources to subscribe to.
  1323. * \param event_sources_count Array size of event_source_uris.
  1324. * \param json Optional output pointer for JSON representation of the app
  1325. * after adding the subscription.
  1326. * \param handler [un]subscribe handler
  1327. *
  1328. * \retval stasis_app_subscribe_res return code.
  1329. */
  1330. static enum stasis_app_subscribe_res app_handle_subscriptions(
  1331. const char *app_name, const char **event_source_uris,
  1332. int event_sources_count, struct ast_json **json,
  1333. app_subscription_handler handler)
  1334. {
  1335. RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
  1336. int i;
  1337. if (!app) {
  1338. return STASIS_ASR_APP_NOT_FOUND;
  1339. }
  1340. for (i = 0; i < event_sources_count; ++i) {
  1341. const char *uri = event_source_uris[i];
  1342. enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
  1343. struct stasis_app_event_source *event_source;
  1344. if (!(event_source = app_event_source_find(uri))) {
  1345. ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
  1346. return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
  1347. }
  1348. if (handler &&
  1349. ((res = handler(app, uri, event_source)))) {
  1350. return res;
  1351. }
  1352. }
  1353. if (json) {
  1354. ast_debug(3, "%s: Successful; setting results\n", app_name);
  1355. *json = stasis_app_object_to_json(app);
  1356. }
  1357. return STASIS_ASR_OK;
  1358. }
  1359. enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
  1360. struct ast_channel *chan)
  1361. {
  1362. RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
  1363. int res;
  1364. if (!app) {
  1365. return STASIS_ASR_APP_NOT_FOUND;
  1366. }
  1367. ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
  1368. res = app_subscribe_channel(app, chan);
  1369. if (res != 0) {
  1370. ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
  1371. app_name, ast_channel_uniqueid(chan));
  1372. return STASIS_ASR_INTERNAL_ERROR;
  1373. }
  1374. return STASIS_ASR_OK;
  1375. }
  1376. /*!
  1377. * \internal
  1378. * \brief Subscribe an app to an event source.
  1379. *
  1380. * \param app subscribing application
  1381. * \param uri scheme:id of an event source
  1382. * \param event_source being subscribed to
  1383. *
  1384. * \retval stasis_app_subscribe_res return code.
  1385. */
  1386. static enum stasis_app_subscribe_res app_subscribe(
  1387. struct stasis_app *app, const char *uri,
  1388. struct stasis_app_event_source *event_source)
  1389. {
  1390. const char *app_name = stasis_app_name(app);
  1391. RAII_VAR(void *, obj, NULL, ao2_cleanup);
  1392. ast_debug(3, "%s: Checking %s\n", app_name, uri);
  1393. if (!event_source->find ||
  1394. (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
  1395. ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
  1396. return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
  1397. }
  1398. ast_debug(3, "%s: Subscribing to %s\n", app_name, uri);
  1399. if (!event_source->subscribe || (event_source->subscribe(app, obj))) {
  1400. ast_log(LOG_WARNING, "Error subscribing app '%s' to '%s'\n",
  1401. app_name, uri);
  1402. return STASIS_ASR_INTERNAL_ERROR;
  1403. }
  1404. return STASIS_ASR_OK;
  1405. }
  1406. enum stasis_app_subscribe_res stasis_app_subscribe(const char *app_name,
  1407. const char **event_source_uris, int event_sources_count,
  1408. struct ast_json **json)
  1409. {
  1410. return app_handle_subscriptions(
  1411. app_name, event_source_uris, event_sources_count,
  1412. json, app_subscribe);
  1413. }
  1414. /*!
  1415. * \internal
  1416. * \brief Unsubscribe an app from an event source.
  1417. *
  1418. * \param app application to unsubscribe
  1419. * \param uri scheme:id of an event source
  1420. * \param event_source being unsubscribed from
  1421. *
  1422. * \retval stasis_app_subscribe_res return code.
  1423. */
  1424. static enum stasis_app_subscribe_res app_unsubscribe(
  1425. struct stasis_app *app, const char *uri,
  1426. struct stasis_app_event_source *event_source)
  1427. {
  1428. const char *app_name = stasis_app_name(app);
  1429. const char *id = uri + strlen(event_source->scheme);
  1430. if (!event_source->is_subscribed ||
  1431. (!event_source->is_subscribed(app, id))) {
  1432. return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
  1433. }
  1434. ast_debug(3, "%s: Unsubscribing from %s\n", app_name, uri);
  1435. if (!event_source->unsubscribe || (event_source->unsubscribe(app, id))) {
  1436. ast_log(LOG_WARNING, "Error unsubscribing app '%s' to '%s'\n",
  1437. app_name, uri);
  1438. return -1;
  1439. }
  1440. return 0;
  1441. }
  1442. enum stasis_app_subscribe_res stasis_app_unsubscribe(const char *app_name,
  1443. const char **event_source_uris, int event_sources_count,
  1444. struct ast_json **json)
  1445. {
  1446. return app_handle_subscriptions(
  1447. app_name, event_source_uris, event_sources_count,
  1448. json, app_unsubscribe);
  1449. }
  1450. enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
  1451. const char *event_name,
  1452. const char **source_uris, int sources_count,
  1453. struct ast_json *json_variables)
  1454. {
  1455. RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
  1456. RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
  1457. RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup);
  1458. RAII_VAR(void *, obj, NULL, ao2_cleanup);
  1459. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  1460. enum stasis_app_subscribe_res res = STASIS_APP_USER_INTERNAL_ERROR;
  1461. struct ast_json *json_value;
  1462. int have_channel = 0;
  1463. int i;
  1464. if (!app) {
  1465. ast_log(LOG_WARNING, "App %s not found\n", app_name);
  1466. return STASIS_APP_USER_APP_NOT_FOUND;
  1467. }
  1468. if (!ast_multi_user_event_type()) {
  1469. return res;
  1470. }
  1471. blob = json_variables;
  1472. if (!blob) {
  1473. blob = ast_json_pack("{}");
  1474. }
  1475. json_value = ast_json_string_create(event_name);
  1476. if (!json_value) {
  1477. ast_log(LOG_ERROR, "unable to create json string\n");
  1478. return res;
  1479. }
  1480. if (ast_json_object_set(blob, "eventname", json_value)) {
  1481. ast_log(LOG_ERROR, "unable to set eventname to blob\n");
  1482. return res;
  1483. }
  1484. multi = ast_multi_object_blob_create(blob);
  1485. for (i = 0; i < sources_count; ++i) {
  1486. const char *uri = source_uris[i];
  1487. void *snapshot=NULL;
  1488. enum stasis_user_multi_object_snapshot_type type;
  1489. if (ast_begins_with(uri, "channel:")) {
  1490. type = STASIS_UMOS_CHANNEL;
  1491. snapshot = ast_channel_snapshot_get_latest(uri + 8);
  1492. have_channel = 1;
  1493. } else if (ast_begins_with(uri, "bridge:")) {
  1494. type = STASIS_UMOS_BRIDGE;
  1495. snapshot = ast_bridge_snapshot_get_latest(uri + 7);
  1496. } else if (ast_begins_with(uri, "endpoint:")) {
  1497. type = STASIS_UMOS_ENDPOINT;
  1498. snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
  1499. } else {
  1500. ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
  1501. return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
  1502. }
  1503. if (!snapshot) {
  1504. ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
  1505. return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
  1506. }
  1507. ast_multi_object_blob_add(multi, type, snapshot);
  1508. }
  1509. message = stasis_message_create(ast_multi_user_event_type(), multi);
  1510. if (!message) {
  1511. ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
  1512. return res;
  1513. }
  1514. /*
  1515. * Publishing to two different topics is normally to be avoided -- except
  1516. * in this case both are final destinations with no forwards (only listeners).
  1517. * The message has to be delivered to the application topic for ARI, but a
  1518. * copy is also delivered directly to the manager for AMI if there is a channel.
  1519. */
  1520. stasis_publish(ast_app_get_topic(app), message);
  1521. if (have_channel) {
  1522. stasis_publish(ast_manager_get_topic(), message);
  1523. }
  1524. return STASIS_APP_USER_OK;
  1525. }
  1526. void stasis_app_ref(void)
  1527. {
  1528. ast_module_ref(ast_module_info->self);
  1529. }
  1530. void stasis_app_unref(void)
  1531. {
  1532. ast_module_unref(ast_module_info->self);
  1533. }
  1534. /*!
  1535. * \brief Subscription to StasisEnd events
  1536. */
  1537. struct stasis_subscription *stasis_end_sub;
  1538. static int unload_module(void)
  1539. {
  1540. stasis_end_sub = stasis_unsubscribe(stasis_end_sub);
  1541. stasis_app_unregister_event_sources();
  1542. messaging_cleanup();
  1543. ao2_cleanup(apps_registry);
  1544. apps_registry = NULL;
  1545. ao2_cleanup(app_controls);
  1546. app_controls = NULL;
  1547. ao2_cleanup(app_bridges);
  1548. app_bridges = NULL;
  1549. ao2_cleanup(app_bridges_moh);
  1550. app_bridges_moh = NULL;
  1551. ao2_cleanup(app_bridges_playback);
  1552. app_bridges_playback = NULL;
  1553. STASIS_MESSAGE_TYPE_CLEANUP(ast_stasis_end_message_type);
  1554. return 0;
  1555. }
  1556. /* \brief Sanitization callback for channel snapshots */
  1557. static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapshot)
  1558. {
  1559. if (!snapshot || !(snapshot->tech_properties & AST_CHAN_TP_INTERNAL)) {
  1560. return 0;
  1561. }
  1562. return 1;
  1563. }
  1564. /* \brief Sanitization callback for channel unique IDs */
  1565. static int channel_id_sanitizer(const char *id)
  1566. {
  1567. RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup);
  1568. return channel_snapshot_sanitizer(snapshot);
  1569. }
  1570. /* \brief Sanitization callbacks for communication to Stasis applications */
  1571. struct stasis_message_sanitizer app_sanitizer = {
  1572. .channel_id = channel_id_sanitizer,
  1573. .channel_snapshot = channel_snapshot_sanitizer,
  1574. };
  1575. struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
  1576. {
  1577. return &app_sanitizer;
  1578. }
  1579. static void remove_masquerade_store_by_name(const char *channel_name)
  1580. {
  1581. struct ast_channel *chan;
  1582. chan = ast_channel_get_by_name(channel_name);
  1583. if (!chan) {
  1584. return;
  1585. }
  1586. remove_masquerade_store(chan);
  1587. ast_channel_unref(chan);
  1588. }
  1589. static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
  1590. struct stasis_message *message)
  1591. {
  1592. struct ast_channel_blob *payload;
  1593. struct ast_channel_snapshot *snapshot;
  1594. const char *app_name;
  1595. char *channel_uri;
  1596. size_t alloc_size;
  1597. const char *channels[1];
  1598. if (stasis_message_type(message) != ast_stasis_end_message_type()) {
  1599. return;
  1600. }
  1601. payload = stasis_message_data(message);
  1602. snapshot = payload->snapshot;
  1603. app_name = ast_json_string_get(ast_json_object_get(payload->blob, "app"));
  1604. /* +8 is for the length of "channel:" */
  1605. alloc_size = AST_MAX_UNIQUEID + 8;
  1606. channel_uri = ast_alloca(alloc_size);
  1607. snprintf(channel_uri, alloc_size, "channel:%s", snapshot->uniqueid);
  1608. channels[0] = channel_uri;
  1609. stasis_app_unsubscribe(app_name, channels, ARRAY_LEN(channels), NULL);
  1610. remove_masquerade_store_by_name(snapshot->name);
  1611. }
  1612. static const struct ast_datastore_info stasis_internal_channel_info = {
  1613. .type = "stasis-internal-channel",
  1614. };
  1615. static int set_internal_datastore(struct ast_channel *chan)
  1616. {
  1617. struct ast_datastore *datastore;
  1618. datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
  1619. if (!datastore) {
  1620. datastore = ast_datastore_alloc(&stasis_internal_channel_info, NULL);
  1621. if (!datastore) {
  1622. return -1;
  1623. }
  1624. ast_channel_datastore_add(chan, datastore);
  1625. }
  1626. return 0;
  1627. }
  1628. int stasis_app_channel_unreal_set_internal(struct ast_channel *chan)
  1629. {
  1630. struct ast_channel *outchan = NULL, *outowner = NULL;
  1631. int res = 0;
  1632. struct ast_unreal_pvt *unreal_pvt = ast_channel_tech_pvt(chan);
  1633. ao2_ref(unreal_pvt, +1);
  1634. ast_unreal_lock_all(unreal_pvt, &outowner, &outchan);
  1635. if (outowner) {
  1636. res |= set_internal_datastore(outowner);
  1637. ast_channel_unlock(outowner);
  1638. ast_channel_unref(outowner);
  1639. }
  1640. if (outchan) {
  1641. res |= set_internal_datastore(outchan);
  1642. ast_channel_unlock(outchan);
  1643. ast_channel_unref(outchan);
  1644. }
  1645. ao2_unlock(unreal_pvt);
  1646. ao2_ref(unreal_pvt, -1);
  1647. return res;
  1648. }
  1649. int stasis_app_channel_set_internal(struct ast_channel *chan)
  1650. {
  1651. int res;
  1652. ast_channel_lock(chan);
  1653. res = set_internal_datastore(chan);
  1654. ast_channel_unlock(chan);
  1655. return res;
  1656. }
  1657. int stasis_app_channel_is_internal(struct ast_channel *chan)
  1658. {
  1659. struct ast_datastore *datastore;
  1660. int res = 0;
  1661. ast_channel_lock(chan);
  1662. datastore = ast_channel_datastore_find(chan, &stasis_internal_channel_info, NULL);
  1663. if (datastore) {
  1664. res = 1;
  1665. }
  1666. ast_channel_unlock(chan);
  1667. return res;
  1668. }
  1669. static int load_module(void)
  1670. {
  1671. if (STASIS_MESSAGE_TYPE_INIT(ast_stasis_end_message_type) != 0) {
  1672. return AST_MODULE_LOAD_DECLINE;
  1673. }
  1674. apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
  1675. app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, control_compare);
  1676. app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, bridges_compare);
  1677. app_bridges_moh = ao2_container_alloc_hash(
  1678. AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
  1679. 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
  1680. app_bridges_playback = ao2_container_alloc_hash(
  1681. AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
  1682. 37, bridges_channel_hash_fn, bridges_channel_sort_fn, NULL);
  1683. if (!apps_registry || !app_controls || !app_bridges || !app_bridges_moh || !app_bridges_playback) {
  1684. unload_module();
  1685. return AST_MODULE_LOAD_FAILURE;
  1686. }
  1687. if (messaging_init()) {
  1688. unload_module();
  1689. return AST_MODULE_LOAD_FAILURE;
  1690. }
  1691. bridge_stasis_init();
  1692. stasis_app_register_event_sources();
  1693. stasis_end_sub = stasis_subscribe(ast_channel_topic_all(), check_for_stasis_end, NULL);
  1694. if (!stasis_end_sub) {
  1695. unload_module();
  1696. return AST_MODULE_LOAD_DECLINE;
  1697. }
  1698. return AST_MODULE_LOAD_SUCCESS;
  1699. }
  1700. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
  1701. .support_level = AST_MODULE_SUPPORT_CORE,
  1702. .load = load_module,
  1703. .unload = unload_module,
  1704. );