1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219 |
- #include "asterisk.h"
- #include "asterisk/threadpool.h"
- #include "asterisk/taskprocessor.h"
- #include "asterisk/astobj2.h"
- #include "asterisk/utils.h"
- #define THREAD_BUCKETS 89
- struct ast_threadpool {
-
- struct ast_threadpool_listener *listener;
-
- struct ao2_container *active_threads;
-
- struct ao2_container *idle_threads;
-
- struct ao2_container *zombie_threads;
-
- struct ast_taskprocessor *tps;
-
- struct ast_taskprocessor *control_tps;
-
- int shutting_down;
-
- struct ast_threadpool_options options;
- };
- struct ast_threadpool_listener {
-
- const struct ast_threadpool_listener_callbacks *callbacks;
-
- void *user_data;
- };
- enum worker_state {
-
- ALIVE,
-
- ZOMBIE,
-
- DEAD,
- };
- struct worker_thread {
-
- int id;
-
- ast_cond_t cond;
-
- ast_mutex_t lock;
-
- pthread_t thread;
-
- struct ast_threadpool *pool;
-
- enum worker_state state;
-
- int wake_up;
-
- struct ast_threadpool_options options;
- };
- static int worker_thread_hash(const void *obj, int flags);
- static int worker_thread_cmp(void *obj, void *arg, int flags);
- static void worker_thread_destroy(void *obj);
- static void worker_active(struct worker_thread *worker);
- static void *worker_start(void *arg);
- static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
- static int worker_thread_start(struct worker_thread *worker);
- static int worker_idle(struct worker_thread *worker);
- static void worker_set_state(struct worker_thread *worker, enum worker_state state);
- static void worker_shutdown(struct worker_thread *worker);
- static void threadpool_send_state_changed(struct ast_threadpool *pool)
- {
- int active_size = ao2_container_count(pool->active_threads);
- int idle_size = ao2_container_count(pool->idle_threads);
- if (pool->listener && pool->listener->callbacks->state_changed) {
- pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
- }
- }
- struct thread_worker_pair {
-
- struct ast_threadpool *pool;
-
- struct worker_thread *worker;
- };
- static void thread_worker_pair_destructor(void *obj)
- {
- struct thread_worker_pair *pair = obj;
- ao2_ref(pair->worker, -1);
- }
- static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
- if (!pair) {
- return NULL;
- }
- pair->pool = pool;
- ao2_ref(worker, +1);
- pair->worker = worker;
- return pair;
- }
- static int queued_active_thread_idle(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_link(pair->pool->idle_threads, pair->worker);
- ao2_unlink(pair->pool->active_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- ao2_ref(pair, -1);
- return 0;
- }
- static void threadpool_active_thread_idle(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
- }
- static int queued_zombie_thread_dead(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_unlink(pair->pool->zombie_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- ao2_ref(pair, -1);
- return 0;
- }
- static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
- }
- static int queued_idle_thread_dead(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_unlink(pair->pool->idle_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- ao2_ref(pair, -1);
- return 0;
- }
- static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
- }
- static int threadpool_execute(struct ast_threadpool *pool)
- {
- ao2_lock(pool);
- if (!pool->shutting_down) {
- ao2_unlock(pool);
- return ast_taskprocessor_execute(pool->tps);
- }
- ao2_unlock(pool);
- return 0;
- }
- static void threadpool_destructor(void *obj)
- {
- struct ast_threadpool *pool = obj;
- ao2_cleanup(pool->listener);
- }
- static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
- {
- RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
- struct ast_str *control_tps_name;
- pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
- control_tps_name = ast_str_create(64);
- if (!pool || !control_tps_name) {
- ast_free(control_tps_name);
- return NULL;
- }
- ast_str_set(&control_tps_name, 0, "%s-control", name);
- pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
- ast_free(control_tps_name);
- if (!pool->control_tps) {
- return NULL;
- }
- pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
- if (!pool->active_threads) {
- return NULL;
- }
- pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
- if (!pool->idle_threads) {
- return NULL;
- }
- pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
- if (!pool->zombie_threads) {
- return NULL;
- }
- pool->options = *options;
- ao2_ref(pool, +1);
- return pool;
- }
- static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
- {
- return 0;
- }
- struct task_pushed_data {
-
- struct ast_threadpool *pool;
-
- int was_empty;
- };
- static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
- int was_empty)
- {
- struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
- if (!tpd) {
- return NULL;
- }
- tpd->pool = pool;
- tpd->was_empty = was_empty;
- return tpd;
- }
- static int activate_thread(void *obj, void *arg, int flags)
- {
- struct worker_thread *worker = obj;
- struct ast_threadpool *pool = arg;
- if (!ao2_link(pool->active_threads, worker)) {
-
- ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
- worker->id);
- return 0;
- }
- worker_set_state(worker, ALIVE);
- return CMP_MATCH;
- }
- static void grow(struct ast_threadpool *pool, int delta)
- {
- int i;
- int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
- if (pool->options.max_size && current_size + delta > pool->options.max_size) {
- delta = pool->options.max_size - current_size;
- }
- ast_debug(3, "Increasing threadpool %s's size by %d\n",
- ast_taskprocessor_name(pool->tps), delta);
- for (i = 0; i < delta; ++i) {
- struct worker_thread *worker = worker_thread_alloc(pool);
- if (!worker) {
- return;
- }
- if (ao2_link(pool->idle_threads, worker)) {
- if (worker_thread_start(worker)) {
- ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
- ao2_unlink(pool->active_threads, worker);
- }
- } else {
- ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
- }
- ao2_ref(worker, -1);
- }
- }
- static int queued_task_pushed(void *data)
- {
- struct task_pushed_data *tpd = data;
- struct ast_threadpool *pool = tpd->pool;
- int was_empty = tpd->was_empty;
- if (pool->listener && pool->listener->callbacks->task_pushed) {
- pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
- }
- if (ao2_container_count(pool->idle_threads) == 0) {
- if (!pool->options.auto_increment) {
- return 0;
- }
- grow(pool, pool->options.auto_increment);
- }
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
- threadpool_send_state_changed(pool);
- ao2_ref(tpd, -1);
- return 0;
- }
- static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
- int was_empty)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- struct task_pushed_data *tpd;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- tpd = task_pushed_data_alloc(pool, was_empty);
- if (!tpd) {
- return;
- }
- ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
- }
- static int queued_emptied(void *data)
- {
- struct ast_threadpool *pool = data;
-
- pool->listener->callbacks->emptied(pool, pool->listener);
- return 0;
- }
- static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- if (pool->listener && pool->listener->callbacks->emptied) {
- ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
- }
- }
- static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- if (pool->listener && pool->listener->callbacks->shutdown) {
- pool->listener->callbacks->shutdown(pool->listener);
- }
- ao2_cleanup(pool->active_threads);
- ao2_cleanup(pool->idle_threads);
- ao2_cleanup(pool->zombie_threads);
- ao2_cleanup(pool);
- }
- static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
- .start = threadpool_tps_start,
- .task_pushed = threadpool_tps_task_pushed,
- .emptied = threadpool_tps_emptied,
- .shutdown = threadpool_tps_shutdown,
- };
- static int kill_threads(void *obj, void *arg, int flags)
- {
- int *num_to_kill = arg;
- if (*num_to_kill > 0) {
- --(*num_to_kill);
- return CMP_MATCH;
- } else {
- return CMP_STOP;
- }
- }
- static int zombify_threads(void *obj, void *arg, void *data, int flags)
- {
- struct worker_thread *worker = obj;
- struct ast_threadpool *pool = arg;
- int *num_to_zombify = data;
- if ((*num_to_zombify)-- > 0) {
- if (!ao2_link(pool->zombie_threads, worker)) {
- ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
- return 0;
- }
- worker_set_state(worker, ZOMBIE);
- return CMP_MATCH;
- } else {
- return CMP_STOP;
- }
- }
- static void shrink(struct ast_threadpool *pool, int delta)
- {
-
- int idle_threads = ao2_container_count(pool->idle_threads);
- int idle_threads_to_kill = MIN(delta, idle_threads);
- int active_threads_to_zombify = delta - idle_threads_to_kill;
- ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
- ast_taskprocessor_name(pool->tps));
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- kill_threads, &idle_threads_to_kill);
- ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
- ast_taskprocessor_name(pool->tps));
- ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- zombify_threads, pool, &active_threads_to_zombify);
- }
- struct set_size_data {
-
- struct ast_threadpool *pool;
-
- unsigned int size;
- };
- static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
- unsigned int size)
- {
- struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
- if (!ssd) {
- return NULL;
- }
- ssd->pool = pool;
- ssd->size = size;
- return ssd;
- }
- static int queued_set_size(void *data)
- {
- RAII_VAR(struct set_size_data *, ssd, data, ao2_cleanup);
- struct ast_threadpool *pool = ssd->pool;
- unsigned int num_threads = ssd->size;
-
- unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
- if (current_size == num_threads) {
- ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
- num_threads, current_size);
- return 0;
- }
- if (current_size < num_threads) {
- grow(pool, num_threads - current_size);
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- activate_thread, pool);
- } else {
- shrink(pool, current_size - num_threads);
- }
- threadpool_send_state_changed(pool);
- return 0;
- }
- void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
- {
- struct set_size_data *ssd;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- ssd = set_size_data_alloc(pool, size);
- if (!ssd) {
- return;
- }
- ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
- }
- struct ast_threadpool_listener *ast_threadpool_listener_alloc(
- const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
- {
- struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
- if (!listener) {
- return NULL;
- }
- listener->callbacks = callbacks;
- listener->user_data = user_data;
- return listener;
- }
- void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
- {
- return listener->user_data;
- }
- struct pool_options_pair {
- struct ast_threadpool *pool;
- struct ast_threadpool_options options;
- };
- struct ast_threadpool *ast_threadpool_create(const char *name,
- struct ast_threadpool_listener *listener,
- const struct ast_threadpool_options *options)
- {
- struct ast_taskprocessor *tps;
- RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
- RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
- pool = threadpool_alloc(name, options);
- if (!pool) {
- return NULL;
- }
- tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
- if (!tps_listener) {
- return NULL;
- }
- if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
- ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
- return NULL;
- }
- tps = ast_taskprocessor_create_with_listener(name, tps_listener);
- if (!tps) {
- return NULL;
- }
- pool->tps = tps;
- if (listener) {
- ao2_ref(listener, +1);
- pool->listener = listener;
- }
- ast_threadpool_set_size(pool, pool->options.initial_size);
- ao2_ref(pool, +1);
- return pool;
- }
- int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
- {
- SCOPED_AO2LOCK(lock, pool);
- if (!pool->shutting_down) {
- return ast_taskprocessor_push(pool->tps, task, data);
- }
- return -1;
- }
- void ast_threadpool_shutdown(struct ast_threadpool *pool)
- {
- if (!pool) {
- return;
- }
-
- ao2_lock(pool);
- pool->shutting_down = 1;
- ao2_unlock(pool);
- ast_taskprocessor_unreference(pool->control_tps);
- ast_taskprocessor_unreference(pool->tps);
- }
- static int worker_id_counter;
- static int worker_thread_hash(const void *obj, int flags)
- {
- const struct worker_thread *worker = obj;
- return worker->id;
- }
- static int worker_thread_cmp(void *obj, void *arg, int flags)
- {
- struct worker_thread *worker1 = obj;
- struct worker_thread *worker2 = arg;
- return worker1->id == worker2->id ? CMP_MATCH : 0;
- }
- static void worker_shutdown(struct worker_thread *worker)
- {
- worker_set_state(worker, DEAD);
- if (worker->thread != AST_PTHREADT_NULL) {
- pthread_join(worker->thread, NULL);
- worker->thread = AST_PTHREADT_NULL;
- }
- }
- static void worker_thread_destroy(void *obj)
- {
- struct worker_thread *worker = obj;
- ast_debug(3, "Destroying worker thread %d\n", worker->id);
- worker_shutdown(worker);
- ast_mutex_destroy(&worker->lock);
- ast_cond_destroy(&worker->cond);
- }
- static void *worker_start(void *arg)
- {
- struct worker_thread *worker = arg;
- if (worker->options.thread_start) {
- worker->options.thread_start();
- }
- ast_mutex_lock(&worker->lock);
- while (worker_idle(worker)) {
- ast_mutex_unlock(&worker->lock);
- worker_active(worker);
- ast_mutex_lock(&worker->lock);
- if (worker->state != ALIVE) {
- break;
- }
- threadpool_active_thread_idle(worker->pool, worker);
- }
- ast_mutex_unlock(&worker->lock);
-
- if (worker->state == ZOMBIE) {
- threadpool_zombie_thread_dead(worker->pool, worker);
- }
- if (worker->options.thread_end) {
- worker->options.thread_end();
- }
- return NULL;
- }
- static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
- {
- struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
- if (!worker) {
- return NULL;
- }
- worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
- ast_mutex_init(&worker->lock);
- ast_cond_init(&worker->cond, NULL);
- worker->pool = pool;
- worker->thread = AST_PTHREADT_NULL;
- worker->state = ALIVE;
- worker->options = pool->options;
- return worker;
- }
- static int worker_thread_start(struct worker_thread *worker)
- {
- return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
- }
- static void worker_active(struct worker_thread *worker)
- {
- int alive;
-
- do {
- alive = threadpool_execute(worker->pool);
- } while (alive);
- }
- static int worker_idle(struct worker_thread *worker)
- {
- struct timeval start = ast_tvnow();
- struct timespec end = {
- .tv_sec = start.tv_sec + worker->options.idle_timeout,
- .tv_nsec = start.tv_usec * 1000,
- };
- while (!worker->wake_up) {
- if (worker->options.idle_timeout <= 0) {
- ast_cond_wait(&worker->cond, &worker->lock);
- } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
- break;
- }
- }
- if (!worker->wake_up) {
- ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
- threadpool_idle_thread_dead(worker->pool, worker);
- worker->state = DEAD;
- }
- worker->wake_up = 0;
- return worker->state == ALIVE;
- }
- static void worker_set_state(struct worker_thread *worker, enum worker_state state)
- {
- SCOPED_MUTEX(lock, &worker->lock);
- worker->state = state;
- worker->wake_up = 1;
- ast_cond_signal(&worker->cond);
- }
- struct serializer {
- struct ast_threadpool *pool;
- };
- static void serializer_dtor(void *obj)
- {
- struct serializer *ser = obj;
- ao2_cleanup(ser->pool);
- ser->pool = NULL;
- }
- static struct serializer *serializer_create(struct ast_threadpool *pool)
- {
- struct serializer *ser;
- ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!ser) {
- return NULL;
- }
- ao2_ref(pool, +1);
- ser->pool = pool;
- return ser;
- }
- static int execute_tasks(void *data)
- {
- struct ast_taskprocessor *tps = data;
- while (ast_taskprocessor_execute(tps)) {
-
- }
- ast_taskprocessor_unreference(tps);
- return 0;
- }
- static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
- {
- if (was_empty) {
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
- struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
- if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
- ast_taskprocessor_unreference(tps);
- }
- }
- }
- static int serializer_start(struct ast_taskprocessor_listener *listener)
- {
-
- return 0;
- }
- static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
- {
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
- ao2_cleanup(ser);
- }
- static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
- .task_pushed = serializer_task_pushed,
- .start = serializer_start,
- .shutdown = serializer_shutdown,
- };
- struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
- {
- RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
- RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
- struct ast_taskprocessor *tps = NULL;
- ser = serializer_create(pool);
- if (!ser) {
- return NULL;
- }
- listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
- if (!listener) {
- return NULL;
- }
- ser = NULL;
- tps = ast_taskprocessor_create_with_listener(name, listener);
- if (!tps) {
- return NULL;
- }
- return tps;
- }
|