123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- /*!
- * \file \brief Test infrastructure for dealing with Stasis.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
- /*** MODULEINFO
- <depend>TEST_FRAMEWORK</depend>
- <support_level>core</support_level>
- ***/
- #include "asterisk.h"
- ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
- #include "asterisk/astobj2.h"
- #include "asterisk/module.h"
- #include "asterisk/stasis_test.h"
- STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
- static void stasis_message_sink_dtor(void *obj)
- {
- struct stasis_message_sink *sink = obj;
- {
- SCOPED_MUTEX(lock, &sink->lock);
- while (!sink->is_done) {
- /* Normally waiting forever is bad, but if we're not
- * done, we're not done. */
- ast_cond_wait(&sink->cond, &sink->lock);
- }
- }
- ast_mutex_destroy(&sink->lock);
- ast_cond_destroy(&sink->cond);
- while (sink->num_messages > 0) {
- ao2_cleanup(sink->messages[--sink->num_messages]);
- }
- ast_free(sink->messages);
- sink->messages = NULL;
- sink->max_messages = 0;
- }
- static struct timespec make_deadline(int timeout_millis)
- {
- struct timeval start = ast_tvnow();
- struct timeval delta = {
- .tv_sec = timeout_millis / 1000,
- .tv_usec = (timeout_millis % 1000) * 1000,
- };
- struct timeval deadline_tv = ast_tvadd(start, delta);
- struct timespec deadline = {
- .tv_sec = deadline_tv.tv_sec,
- .tv_nsec = 1000 * deadline_tv.tv_usec,
- };
- return deadline;
- }
- struct stasis_message_sink *stasis_message_sink_create(void)
- {
- RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
- sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
- if (!sink) {
- return NULL;
- }
- ast_mutex_init(&sink->lock);
- ast_cond_init(&sink->cond, NULL);
- sink->max_messages = 4;
- sink->messages =
- ast_malloc(sizeof(*sink->messages) * sink->max_messages);
- if (!sink->messages) {
- return NULL;
- }
- ao2_ref(sink, +1);
- return sink;
- }
- /*!
- * \brief Implementation of the stasis_message_sink_cb() callback.
- *
- * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
- * it has to do with how we load modules.
- *
- * Modules have their own metadata compiled into them in the module info block
- * at the end of the file. This includes dependency information in the
- * \c nonoptreq field.
- *
- * Asterisk loads the module, inspects the field, then loads any needed
- * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
- * dlopen(), which defers binding function references until they are called.
- *
- * But when you take the address of a function, that function needs to be
- * available at load time. So if some module used the address of
- * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
- * that module would fail to load.
- *
- * The stasis_message_sink_cb() function gives us a layer of indirection so that
- * the initial lazy binding will still work as expected.
- */
- static void message_sink_cb(void *data, struct stasis_subscription *sub,
- struct stasis_message *message)
- {
- struct stasis_message_sink *sink = data;
- SCOPED_MUTEX(lock, &sink->lock);
- if (stasis_subscription_final_message(sub, message)) {
- sink->is_done = 1;
- ast_cond_signal(&sink->cond);
- return;
- }
- if (stasis_subscription_change_type() == stasis_message_type(message)) {
- /* Ignore subscription changes */
- return;
- }
- if (sink->num_messages == sink->max_messages) {
- size_t new_max_messages = sink->max_messages * 2;
- struct stasis_message **new_messages = ast_realloc(
- sink->messages,
- sizeof(*new_messages) * new_max_messages);
- if (!new_messages) {
- return;
- }
- sink->max_messages = new_max_messages;
- sink->messages = new_messages;
- }
- ao2_ref(message, +1);
- sink->messages[sink->num_messages++] = message;
- ast_cond_signal(&sink->cond);
- }
- stasis_subscription_cb stasis_message_sink_cb(void)
- {
- return message_sink_cb;
- }
- int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
- int num_messages, int timeout_millis)
- {
- struct timespec deadline = make_deadline(timeout_millis);
- SCOPED_MUTEX(lock, &sink->lock);
- while (sink->num_messages < num_messages) {
- int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
- if (r == ETIMEDOUT) {
- break;
- }
- if (r != 0) {
- ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
- strerror(r));
- break;
- }
- }
- return sink->num_messages;
- }
- int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
- int num_messages, int timeout_millis)
- {
- struct timespec deadline = make_deadline(timeout_millis);
- SCOPED_MUTEX(lock, &sink->lock);
- while (sink->num_messages == num_messages) {
- int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
- if (r == ETIMEDOUT) {
- break;
- }
- if (r != 0) {
- ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
- strerror(r));
- break;
- }
- }
- return sink->num_messages;
- }
- int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
- stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
- {
- struct timespec deadline = make_deadline(timeout_millis);
- SCOPED_MUTEX(lock, &sink->lock);
- /* wait for the start */
- while (sink->num_messages < start + 1) {
- int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
- if (r == ETIMEDOUT) {
- /* Timed out waiting for the start */
- return -1;
- }
- if (r != 0) {
- ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
- strerror(r));
- return -2;
- }
- }
- while (!cmp_cb(sink->messages[start], data)) {
- ++start;
- while (sink->num_messages < start + 1) {
- int r = ast_cond_timedwait(&sink->cond,
- &sink->lock, &deadline);
- if (r == ETIMEDOUT) {
- return -1;
- }
- if (r != 0) {
- ast_log(LOG_ERROR,
- "Unexpected condition error: %s\n",
- strerror(r));
- return -2;
- }
- }
- }
- return start;
- }
- struct stasis_message *stasis_test_message_create(void)
- {
- RAII_VAR(void *, data, NULL, ao2_cleanup);
- if (!stasis_test_message_type()) {
- return NULL;
- }
- /* We just need the unique pointer; don't care what's in it */
- data = ao2_alloc(1, NULL);
- if (!data) {
- return NULL;
- }
- return stasis_message_create(stasis_test_message_type(), data);
- }
- static int unload_module(void)
- {
- STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
- return 0;
- }
- static int load_module(void)
- {
- if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
- return AST_MODULE_LOAD_FAILURE;
- }
- return AST_MODULE_LOAD_SUCCESS;
- }
- AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
- .support_level = AST_MODULE_SUPPORT_CORE,
- .load = load_module,
- .unload = unload_module,
- .load_pri = AST_MODPRI_APP_DEPEND,
- );
|