res_stasis_test.c 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file \brief Test infrastructure for dealing with Stasis.
  20. *
  21. * \author David M. Lee, II <dlee@digium.com>
  22. */
  23. /*** MODULEINFO
  24. <depend>TEST_FRAMEWORK</depend>
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/module.h"
  31. #include "asterisk/stasis_test.h"
  32. STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
  33. static void stasis_message_sink_dtor(void *obj)
  34. {
  35. struct stasis_message_sink *sink = obj;
  36. {
  37. SCOPED_MUTEX(lock, &sink->lock);
  38. while (!sink->is_done) {
  39. /* Normally waiting forever is bad, but if we're not
  40. * done, we're not done. */
  41. ast_cond_wait(&sink->cond, &sink->lock);
  42. }
  43. }
  44. ast_mutex_destroy(&sink->lock);
  45. ast_cond_destroy(&sink->cond);
  46. while (sink->num_messages > 0) {
  47. ao2_cleanup(sink->messages[--sink->num_messages]);
  48. }
  49. ast_free(sink->messages);
  50. sink->messages = NULL;
  51. sink->max_messages = 0;
  52. }
  53. static struct timespec make_deadline(int timeout_millis)
  54. {
  55. struct timeval start = ast_tvnow();
  56. struct timeval delta = {
  57. .tv_sec = timeout_millis / 1000,
  58. .tv_usec = (timeout_millis % 1000) * 1000,
  59. };
  60. struct timeval deadline_tv = ast_tvadd(start, delta);
  61. struct timespec deadline = {
  62. .tv_sec = deadline_tv.tv_sec,
  63. .tv_nsec = 1000 * deadline_tv.tv_usec,
  64. };
  65. return deadline;
  66. }
  67. struct stasis_message_sink *stasis_message_sink_create(void)
  68. {
  69. RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
  70. sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
  71. if (!sink) {
  72. return NULL;
  73. }
  74. ast_mutex_init(&sink->lock);
  75. ast_cond_init(&sink->cond, NULL);
  76. sink->max_messages = 4;
  77. sink->messages =
  78. ast_malloc(sizeof(*sink->messages) * sink->max_messages);
  79. if (!sink->messages) {
  80. return NULL;
  81. }
  82. ao2_ref(sink, +1);
  83. return sink;
  84. }
  85. /*!
  86. * \brief Implementation of the stasis_message_sink_cb() callback.
  87. *
  88. * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
  89. * it has to do with how we load modules.
  90. *
  91. * Modules have their own metadata compiled into them in the module info block
  92. * at the end of the file. This includes dependency information in the
  93. * \c nonoptreq field.
  94. *
  95. * Asterisk loads the module, inspects the field, then loads any needed
  96. * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
  97. * dlopen(), which defers binding function references until they are called.
  98. *
  99. * But when you take the address of a function, that function needs to be
  100. * available at load time. So if some module used the address of
  101. * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
  102. * that module would fail to load.
  103. *
  104. * The stasis_message_sink_cb() function gives us a layer of indirection so that
  105. * the initial lazy binding will still work as expected.
  106. */
  107. static void message_sink_cb(void *data, struct stasis_subscription *sub,
  108. struct stasis_message *message)
  109. {
  110. struct stasis_message_sink *sink = data;
  111. SCOPED_MUTEX(lock, &sink->lock);
  112. if (stasis_subscription_final_message(sub, message)) {
  113. sink->is_done = 1;
  114. ast_cond_signal(&sink->cond);
  115. return;
  116. }
  117. if (stasis_subscription_change_type() == stasis_message_type(message)) {
  118. /* Ignore subscription changes */
  119. return;
  120. }
  121. if (sink->num_messages == sink->max_messages) {
  122. size_t new_max_messages = sink->max_messages * 2;
  123. struct stasis_message **new_messages = ast_realloc(
  124. sink->messages,
  125. sizeof(*new_messages) * new_max_messages);
  126. if (!new_messages) {
  127. return;
  128. }
  129. sink->max_messages = new_max_messages;
  130. sink->messages = new_messages;
  131. }
  132. ao2_ref(message, +1);
  133. sink->messages[sink->num_messages++] = message;
  134. ast_cond_signal(&sink->cond);
  135. }
  136. stasis_subscription_cb stasis_message_sink_cb(void)
  137. {
  138. return message_sink_cb;
  139. }
  140. int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
  141. int num_messages, int timeout_millis)
  142. {
  143. struct timespec deadline = make_deadline(timeout_millis);
  144. SCOPED_MUTEX(lock, &sink->lock);
  145. while (sink->num_messages < num_messages) {
  146. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  147. if (r == ETIMEDOUT) {
  148. break;
  149. }
  150. if (r != 0) {
  151. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  152. strerror(r));
  153. break;
  154. }
  155. }
  156. return sink->num_messages;
  157. }
  158. int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
  159. int num_messages, int timeout_millis)
  160. {
  161. struct timespec deadline = make_deadline(timeout_millis);
  162. SCOPED_MUTEX(lock, &sink->lock);
  163. while (sink->num_messages == num_messages) {
  164. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  165. if (r == ETIMEDOUT) {
  166. break;
  167. }
  168. if (r != 0) {
  169. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  170. strerror(r));
  171. break;
  172. }
  173. }
  174. return sink->num_messages;
  175. }
  176. int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
  177. stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
  178. {
  179. struct timespec deadline = make_deadline(timeout_millis);
  180. SCOPED_MUTEX(lock, &sink->lock);
  181. /* wait for the start */
  182. while (sink->num_messages < start + 1) {
  183. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  184. if (r == ETIMEDOUT) {
  185. /* Timed out waiting for the start */
  186. return -1;
  187. }
  188. if (r != 0) {
  189. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  190. strerror(r));
  191. return -2;
  192. }
  193. }
  194. while (!cmp_cb(sink->messages[start], data)) {
  195. ++start;
  196. while (sink->num_messages < start + 1) {
  197. int r = ast_cond_timedwait(&sink->cond,
  198. &sink->lock, &deadline);
  199. if (r == ETIMEDOUT) {
  200. return -1;
  201. }
  202. if (r != 0) {
  203. ast_log(LOG_ERROR,
  204. "Unexpected condition error: %s\n",
  205. strerror(r));
  206. return -2;
  207. }
  208. }
  209. }
  210. return start;
  211. }
  212. struct stasis_message *stasis_test_message_create(void)
  213. {
  214. RAII_VAR(void *, data, NULL, ao2_cleanup);
  215. if (!stasis_test_message_type()) {
  216. return NULL;
  217. }
  218. /* We just need the unique pointer; don't care what's in it */
  219. data = ao2_alloc(1, NULL);
  220. if (!data) {
  221. return NULL;
  222. }
  223. return stasis_message_create(stasis_test_message_type(), data);
  224. }
  225. static int unload_module(void)
  226. {
  227. STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
  228. return 0;
  229. }
  230. static int load_module(void)
  231. {
  232. if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
  233. return AST_MODULE_LOAD_FAILURE;
  234. }
  235. return AST_MODULE_LOAD_SUCCESS;
  236. }
  237. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
  238. .support_level = AST_MODULE_SUPPORT_CORE,
  239. .load = load_module,
  240. .unload = unload_module,
  241. .load_pri = AST_MODPRI_APP_DEPEND,
  242. );