threadpool.c 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012-2013, Digium, Inc.
  5. *
  6. * Mark Michelson <mmmichelson@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. #include "asterisk.h"
  19. #include "asterisk/threadpool.h"
  20. #include "asterisk/taskprocessor.h"
  21. #include "asterisk/astobj2.h"
  22. #include "asterisk/utils.h"
  23. /* Needs to stay prime if increased */
  24. #define THREAD_BUCKETS 89
  25. /*!
  26. * \brief An opaque threadpool structure
  27. *
  28. * A threadpool is a collection of threads that execute
  29. * tasks from a common queue.
  30. */
  31. struct ast_threadpool {
  32. /*! Threadpool listener */
  33. struct ast_threadpool_listener *listener;
  34. /*!
  35. * \brief The container of active threads.
  36. * Active threads are those that are currently running tasks
  37. */
  38. struct ao2_container *active_threads;
  39. /*!
  40. * \brief The container of idle threads.
  41. * Idle threads are those that are currenly waiting to run tasks
  42. */
  43. struct ao2_container *idle_threads;
  44. /*!
  45. * \brief The container of zombie threads.
  46. * Zombie threads may be running tasks, but they are scheduled to die soon
  47. */
  48. struct ao2_container *zombie_threads;
  49. /*!
  50. * \brief The main taskprocessor
  51. *
  52. * Tasks that are queued in this taskprocessor are
  53. * doled out to the worker threads. Worker threads that
  54. * execute tasks from the threadpool are executing tasks
  55. * in this taskprocessor.
  56. *
  57. * The threadpool itself is actually the private data for
  58. * this taskprocessor's listener. This way, as taskprocessor
  59. * changes occur, the threadpool can alert its listeners
  60. * appropriately.
  61. */
  62. struct ast_taskprocessor *tps;
  63. /*!
  64. * \brief The control taskprocessor
  65. *
  66. * This is a standard taskprocessor that uses the default
  67. * taskprocessor listener. In other words, all tasks queued to
  68. * this taskprocessor have a single thread that executes the
  69. * tasks.
  70. *
  71. * All tasks that modify the state of the threadpool and all tasks
  72. * that call out to threadpool listeners are pushed to this
  73. * taskprocessor.
  74. *
  75. * For instance, when the threadpool changes sizes, a task is put
  76. * into this taskprocessor to do so. When it comes time to tell the
  77. * threadpool listener that worker threads have changed state,
  78. * the task is placed in this taskprocessor.
  79. *
  80. * This is done for three main reasons
  81. * 1) It ensures that listeners are given an accurate portrayal
  82. * of the threadpool's current state. In other words, when a listener
  83. * gets told a count of active, idle and zombie threads, it does not
  84. * need to worry that internal state of the threadpool might be different
  85. * from what it has been told.
  86. * 2) It minimizes the locking required in both the threadpool and in
  87. * threadpool listener's callbacks.
  88. * 3) It ensures that listener callbacks are called in the same order
  89. * that the threadpool had its state change.
  90. */
  91. struct ast_taskprocessor *control_tps;
  92. /*! True if the threadpool is in the process of shutting down */
  93. int shutting_down;
  94. /*! Threadpool-specific options */
  95. struct ast_threadpool_options options;
  96. };
  97. /*!
  98. * \brief listener for a threadpool
  99. *
  100. * The listener is notified of changes in a threadpool. It can
  101. * react by doing things like increasing the number of threads
  102. * in the pool
  103. */
  104. struct ast_threadpool_listener {
  105. /*! Callbacks called by the threadpool */
  106. const struct ast_threadpool_listener_callbacks *callbacks;
  107. /*! User data for the listener */
  108. void *user_data;
  109. };
  110. /*!
  111. * \brief states for worker threads
  112. */
  113. enum worker_state {
  114. /*! The worker is either active or idle */
  115. ALIVE,
  116. /*!
  117. * The worker has been asked to shut down but
  118. * may still be in the process of executing tasks.
  119. * This transition happens when the threadpool needs
  120. * to shrink and needs to kill active threads in order
  121. * to do so.
  122. */
  123. ZOMBIE,
  124. /*!
  125. * The worker has been asked to shut down. Typically
  126. * only idle threads go to this state directly, but
  127. * active threads may go straight to this state when
  128. * the threadpool is shut down.
  129. */
  130. DEAD,
  131. };
  132. /*!
  133. * A thread that executes threadpool tasks
  134. */
  135. struct worker_thread {
  136. /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
  137. int id;
  138. /*! Condition used in conjunction with state changes */
  139. ast_cond_t cond;
  140. /*! Lock used alongside the condition for state changes */
  141. ast_mutex_t lock;
  142. /*! The actual thread that is executing tasks */
  143. pthread_t thread;
  144. /*! A pointer to the threadpool. Needed to be able to execute tasks */
  145. struct ast_threadpool *pool;
  146. /*! The current state of the worker thread */
  147. enum worker_state state;
  148. /*! A boolean used to determine if an idle thread should become active */
  149. int wake_up;
  150. /*! Options for this threadpool */
  151. struct ast_threadpool_options options;
  152. };
  153. /* Worker thread forward declarations. See definitions for documentation */
  154. static int worker_thread_hash(const void *obj, int flags);
  155. static int worker_thread_cmp(void *obj, void *arg, int flags);
  156. static void worker_thread_destroy(void *obj);
  157. static void worker_active(struct worker_thread *worker);
  158. static void *worker_start(void *arg);
  159. static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
  160. static int worker_thread_start(struct worker_thread *worker);
  161. static int worker_idle(struct worker_thread *worker);
  162. static void worker_set_state(struct worker_thread *worker, enum worker_state state);
  163. static void worker_shutdown(struct worker_thread *worker);
  164. /*!
  165. * \brief Notify the threadpool listener that the state has changed.
  166. *
  167. * This notifies the threadpool listener via its state_changed callback.
  168. * \param pool The threadpool whose state has changed
  169. */
  170. static void threadpool_send_state_changed(struct ast_threadpool *pool)
  171. {
  172. int active_size = ao2_container_count(pool->active_threads);
  173. int idle_size = ao2_container_count(pool->idle_threads);
  174. if (pool->listener && pool->listener->callbacks->state_changed) {
  175. pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
  176. }
  177. }
  178. /*!
  179. * \brief Struct used for queued operations involving worker state changes
  180. */
  181. struct thread_worker_pair {
  182. /*! Threadpool that contains the worker whose state has changed */
  183. struct ast_threadpool *pool;
  184. /*! Worker whose state has changed */
  185. struct worker_thread *worker;
  186. };
  187. /*!
  188. * \brief Destructor for thread_worker_pair
  189. */
  190. static void thread_worker_pair_destructor(void *obj)
  191. {
  192. struct thread_worker_pair *pair = obj;
  193. ao2_ref(pair->worker, -1);
  194. }
  195. /*!
  196. * \brief Allocate and initialize a thread_worker_pair
  197. * \param pool Threadpool to assign to the thread_worker_pair
  198. * \param worker Worker thread to assign to the thread_worker_pair
  199. */
  200. static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
  201. struct worker_thread *worker)
  202. {
  203. struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
  204. if (!pair) {
  205. return NULL;
  206. }
  207. pair->pool = pool;
  208. ao2_ref(worker, +1);
  209. pair->worker = worker;
  210. return pair;
  211. }
  212. /*!
  213. * \brief Move a worker thread from the active container to the idle container.
  214. *
  215. * This function is called from the threadpool's control taskprocessor thread.
  216. * \param data A thread_worker_pair containing the threadpool and the worker to move.
  217. * \return 0
  218. */
  219. static int queued_active_thread_idle(void *data)
  220. {
  221. struct thread_worker_pair *pair = data;
  222. ao2_link(pair->pool->idle_threads, pair->worker);
  223. ao2_unlink(pair->pool->active_threads, pair->worker);
  224. threadpool_send_state_changed(pair->pool);
  225. ao2_ref(pair, -1);
  226. return 0;
  227. }
  228. /*!
  229. * \brief Queue a task to move a thread from the active list to the idle list
  230. *
  231. * This is called by a worker thread when it runs out of tasks to perform and
  232. * goes idle.
  233. * \param pool The threadpool to which the worker belongs
  234. * \param worker The worker thread that has gone idle
  235. */
  236. static void threadpool_active_thread_idle(struct ast_threadpool *pool,
  237. struct worker_thread *worker)
  238. {
  239. struct thread_worker_pair *pair;
  240. SCOPED_AO2LOCK(lock, pool);
  241. if (pool->shutting_down) {
  242. return;
  243. }
  244. pair = thread_worker_pair_alloc(pool, worker);
  245. if (!pair) {
  246. return;
  247. }
  248. ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
  249. }
  250. /*!
  251. * \brief Kill a zombie thread
  252. *
  253. * This runs from the threadpool's control taskprocessor thread.
  254. *
  255. * \param data A thread_worker_pair containing the threadpool and the zombie thread
  256. * \return 0
  257. */
  258. static int queued_zombie_thread_dead(void *data)
  259. {
  260. struct thread_worker_pair *pair = data;
  261. ao2_unlink(pair->pool->zombie_threads, pair->worker);
  262. threadpool_send_state_changed(pair->pool);
  263. ao2_ref(pair, -1);
  264. return 0;
  265. }
  266. /*!
  267. * \brief Queue a task to kill a zombie thread
  268. *
  269. * This is called by a worker thread when it acknowledges that it is time for
  270. * it to die.
  271. */
  272. static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
  273. struct worker_thread *worker)
  274. {
  275. struct thread_worker_pair *pair;
  276. SCOPED_AO2LOCK(lock, pool);
  277. if (pool->shutting_down) {
  278. return;
  279. }
  280. pair = thread_worker_pair_alloc(pool, worker);
  281. if (!pair) {
  282. return;
  283. }
  284. ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
  285. }
  286. static int queued_idle_thread_dead(void *data)
  287. {
  288. struct thread_worker_pair *pair = data;
  289. ao2_unlink(pair->pool->idle_threads, pair->worker);
  290. threadpool_send_state_changed(pair->pool);
  291. ao2_ref(pair, -1);
  292. return 0;
  293. }
  294. static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
  295. struct worker_thread *worker)
  296. {
  297. struct thread_worker_pair *pair;
  298. SCOPED_AO2LOCK(lock, pool);
  299. if (pool->shutting_down) {
  300. return;
  301. }
  302. pair = thread_worker_pair_alloc(pool, worker);
  303. if (!pair) {
  304. return;
  305. }
  306. ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
  307. }
  308. /*!
  309. * \brief Execute a task in the threadpool
  310. *
  311. * This is the function that worker threads call in order to execute tasks
  312. * in the threadpool
  313. *
  314. * \param pool The pool to which the tasks belong.
  315. * \retval 0 Either the pool has been shut down or there are no tasks.
  316. * \retval 1 There are still tasks remaining in the pool.
  317. */
  318. static int threadpool_execute(struct ast_threadpool *pool)
  319. {
  320. ao2_lock(pool);
  321. if (!pool->shutting_down) {
  322. ao2_unlock(pool);
  323. return ast_taskprocessor_execute(pool->tps);
  324. }
  325. ao2_unlock(pool);
  326. return 0;
  327. }
  328. /*!
  329. * \brief Destroy a threadpool's components.
  330. *
  331. * This is the destructor called automatically when the threadpool's
  332. * reference count reaches zero. This is not to be confused with
  333. * threadpool_destroy.
  334. *
  335. * By the time this actually gets called, most of the cleanup has already
  336. * been done in the pool. The only thing left to do is to release the
  337. * final reference to the threadpool listener.
  338. *
  339. * \param obj The pool to destroy
  340. */
  341. static void threadpool_destructor(void *obj)
  342. {
  343. struct ast_threadpool *pool = obj;
  344. ao2_cleanup(pool->listener);
  345. }
  346. /*
  347. * \brief Allocate a threadpool
  348. *
  349. * This is implemented as a taskprocessor listener's alloc callback. This
  350. * is because the threadpool exists as the private data on a taskprocessor
  351. * listener.
  352. *
  353. * \param name The name of the threadpool.
  354. * \param options The options the threadpool uses.
  355. * \retval NULL Could not initialize threadpool properly
  356. * \retval non-NULL The newly-allocated threadpool
  357. */
  358. static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
  359. {
  360. RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
  361. struct ast_str *control_tps_name;
  362. pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
  363. control_tps_name = ast_str_create(64);
  364. if (!pool || !control_tps_name) {
  365. ast_free(control_tps_name);
  366. return NULL;
  367. }
  368. ast_str_set(&control_tps_name, 0, "%s-control", name);
  369. pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
  370. ast_free(control_tps_name);
  371. if (!pool->control_tps) {
  372. return NULL;
  373. }
  374. pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
  375. if (!pool->active_threads) {
  376. return NULL;
  377. }
  378. pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
  379. if (!pool->idle_threads) {
  380. return NULL;
  381. }
  382. pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
  383. if (!pool->zombie_threads) {
  384. return NULL;
  385. }
  386. pool->options = *options;
  387. ao2_ref(pool, +1);
  388. return pool;
  389. }
  390. static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
  391. {
  392. return 0;
  393. }
  394. /*!
  395. * \brief helper used for queued task when tasks are pushed
  396. */
  397. struct task_pushed_data {
  398. /*! Pool into which a task was pushed */
  399. struct ast_threadpool *pool;
  400. /*! Indicator of whether the pool had no tasks prior to the new task being added */
  401. int was_empty;
  402. };
  403. /*!
  404. * \brief Allocate and initialize a task_pushed_data
  405. * \param pool The threadpool to set in the task_pushed_data
  406. * \param was_empty The was_empty value to set in the task_pushed_data
  407. * \retval NULL Unable to allocate task_pushed_data
  408. * \retval non-NULL The newly-allocated task_pushed_data
  409. */
  410. static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
  411. int was_empty)
  412. {
  413. struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
  414. if (!tpd) {
  415. return NULL;
  416. }
  417. tpd->pool = pool;
  418. tpd->was_empty = was_empty;
  419. return tpd;
  420. }
  421. /*!
  422. * \brief Activate idle threads
  423. *
  424. * This function always returns CMP_MATCH because all workers that this
  425. * function acts on need to be seen as matches so they are unlinked from the
  426. * list of idle threads.
  427. *
  428. * Called as an ao2_callback in the threadpool's control taskprocessor thread.
  429. * \param obj The worker to activate
  430. * \param arg The pool where the worker belongs
  431. * \retval CMP_MATCH
  432. */
  433. static int activate_thread(void *obj, void *arg, int flags)
  434. {
  435. struct worker_thread *worker = obj;
  436. struct ast_threadpool *pool = arg;
  437. if (!ao2_link(pool->active_threads, worker)) {
  438. /* If we can't link the idle thread into the active container, then
  439. * we'll just leave the thread idle and not wake it up.
  440. */
  441. ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
  442. worker->id);
  443. return 0;
  444. }
  445. worker_set_state(worker, ALIVE);
  446. return CMP_MATCH;
  447. }
  448. /*!
  449. * \brief Add threads to the threadpool
  450. *
  451. * This function is called from the threadpool's control taskprocessor thread.
  452. * \param pool The pool that is expanding
  453. * \delta The number of threads to add to the pool
  454. */
  455. static void grow(struct ast_threadpool *pool, int delta)
  456. {
  457. int i;
  458. int current_size = ao2_container_count(pool->active_threads) +
  459. ao2_container_count(pool->idle_threads);
  460. if (pool->options.max_size && current_size + delta > pool->options.max_size) {
  461. delta = pool->options.max_size - current_size;
  462. }
  463. ast_debug(3, "Increasing threadpool %s's size by %d\n",
  464. ast_taskprocessor_name(pool->tps), delta);
  465. for (i = 0; i < delta; ++i) {
  466. struct worker_thread *worker = worker_thread_alloc(pool);
  467. if (!worker) {
  468. return;
  469. }
  470. if (ao2_link(pool->idle_threads, worker)) {
  471. if (worker_thread_start(worker)) {
  472. ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
  473. ao2_unlink(pool->active_threads, worker);
  474. }
  475. } else {
  476. ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
  477. }
  478. ao2_ref(worker, -1);
  479. }
  480. }
  481. /*!
  482. * \brief Queued task called when tasks are pushed into the threadpool
  483. *
  484. * This function first calls into the threadpool's listener to let it know
  485. * that a task has been pushed. It then wakes up all idle threads and moves
  486. * them into the active thread container.
  487. * \param data A task_pushed_data
  488. * \return 0
  489. */
  490. static int queued_task_pushed(void *data)
  491. {
  492. struct task_pushed_data *tpd = data;
  493. struct ast_threadpool *pool = tpd->pool;
  494. int was_empty = tpd->was_empty;
  495. if (pool->listener && pool->listener->callbacks->task_pushed) {
  496. pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
  497. }
  498. if (ao2_container_count(pool->idle_threads) == 0) {
  499. if (!pool->options.auto_increment) {
  500. return 0;
  501. }
  502. grow(pool, pool->options.auto_increment);
  503. }
  504. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
  505. activate_thread, pool);
  506. threadpool_send_state_changed(pool);
  507. ao2_ref(tpd, -1);
  508. return 0;
  509. }
  510. /*!
  511. * \brief Taskprocessor listener callback called when a task is added
  512. *
  513. * The threadpool uses this opportunity to queue a task on its control taskprocessor
  514. * in order to activate idle threads and notify the threadpool listener that the
  515. * task has been pushed.
  516. * \param listener The taskprocessor listener. The threadpool is the listener's private data
  517. * \param was_empty True if the taskprocessor was empty prior to the task being pushed
  518. */
  519. static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
  520. int was_empty)
  521. {
  522. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  523. struct task_pushed_data *tpd;
  524. SCOPED_AO2LOCK(lock, pool);
  525. if (pool->shutting_down) {
  526. return;
  527. }
  528. tpd = task_pushed_data_alloc(pool, was_empty);
  529. if (!tpd) {
  530. return;
  531. }
  532. ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
  533. }
  534. /*!
  535. * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
  536. *
  537. * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
  538. * \param data The pool that has become empty
  539. * \return 0
  540. */
  541. static int queued_emptied(void *data)
  542. {
  543. struct ast_threadpool *pool = data;
  544. /* We already checked for existence of this callback when this was queued */
  545. pool->listener->callbacks->emptied(pool, pool->listener);
  546. return 0;
  547. }
  548. /*!
  549. * \brief Taskprocessor listener emptied callback
  550. *
  551. * The threadpool queues a task to let the threadpool listener know that
  552. * the threadpool no longer contains any tasks.
  553. * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  554. */
  555. static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
  556. {
  557. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  558. SCOPED_AO2LOCK(lock, pool);
  559. if (pool->shutting_down) {
  560. return;
  561. }
  562. if (pool->listener && pool->listener->callbacks->emptied) {
  563. ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
  564. }
  565. }
  566. /*!
  567. * \brief Taskprocessor listener shutdown callback
  568. *
  569. * The threadpool will shut down and destroy all of its worker threads when
  570. * this is called back. By the time this gets called, the taskprocessor's
  571. * control taskprocessor has already been destroyed. Therefore there is no risk
  572. * in outright destroying the worker threads here.
  573. * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  574. */
  575. static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
  576. {
  577. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  578. if (pool->listener && pool->listener->callbacks->shutdown) {
  579. pool->listener->callbacks->shutdown(pool->listener);
  580. }
  581. ao2_cleanup(pool->active_threads);
  582. ao2_cleanup(pool->idle_threads);
  583. ao2_cleanup(pool->zombie_threads);
  584. ao2_cleanup(pool);
  585. }
  586. /*!
  587. * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
  588. */
  589. static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
  590. .start = threadpool_tps_start,
  591. .task_pushed = threadpool_tps_task_pushed,
  592. .emptied = threadpool_tps_emptied,
  593. .shutdown = threadpool_tps_shutdown,
  594. };
  595. /*!
  596. * \brief ao2 callback to kill a set number of threads.
  597. *
  598. * Threads will be unlinked from the container as long as the
  599. * counter has not reached zero. The counter is decremented with
  600. * each thread that is removed.
  601. * \param obj The worker thread up for possible destruction
  602. * \param arg The counter
  603. * \param flags Unused
  604. * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
  605. * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
  606. */
  607. static int kill_threads(void *obj, void *arg, int flags)
  608. {
  609. int *num_to_kill = arg;
  610. if (*num_to_kill > 0) {
  611. --(*num_to_kill);
  612. return CMP_MATCH;
  613. } else {
  614. return CMP_STOP;
  615. }
  616. }
  617. /*!
  618. * \brief ao2 callback to zombify a set number of threads.
  619. *
  620. * Threads will be zombified as long as as the counter has not reached
  621. * zero. The counter is decremented with each thread that is zombified.
  622. *
  623. * Zombifying a thread involves removing it from its current container,
  624. * adding it to the zombie container, and changing the state of the
  625. * worker to a zombie
  626. *
  627. * This callback is called from the threadpool control taskprocessor thread.
  628. *
  629. * \param obj The worker thread that may be zombified
  630. * \param arg The pool to which the worker belongs
  631. * \param data The counter
  632. * \param flags Unused
  633. * \retval CMP_MATCH The zombified thread should be removed from its current container
  634. * \retval CMP_STOP Stop attempting to zombify threads
  635. */
  636. static int zombify_threads(void *obj, void *arg, void *data, int flags)
  637. {
  638. struct worker_thread *worker = obj;
  639. struct ast_threadpool *pool = arg;
  640. int *num_to_zombify = data;
  641. if ((*num_to_zombify)-- > 0) {
  642. if (!ao2_link(pool->zombie_threads, worker)) {
  643. ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
  644. return 0;
  645. }
  646. worker_set_state(worker, ZOMBIE);
  647. return CMP_MATCH;
  648. } else {
  649. return CMP_STOP;
  650. }
  651. }
  652. /*!
  653. * \brief Remove threads from the threadpool
  654. *
  655. * The preference is to kill idle threads. However, if there are
  656. * more threads to remove than there are idle threads, then active
  657. * threads will be zombified instead.
  658. *
  659. * This function is called from the threadpool control taskprocessor thread.
  660. *
  661. * \param pool The threadpool to remove threads from
  662. * \param delta The number of threads to remove
  663. */
  664. static void shrink(struct ast_threadpool *pool, int delta)
  665. {
  666. /*
  667. * Preference is to kill idle threads, but
  668. * we'll move on to deactivating active threads
  669. * if we have to
  670. */
  671. int idle_threads = ao2_container_count(pool->idle_threads);
  672. int idle_threads_to_kill = MIN(delta, idle_threads);
  673. int active_threads_to_zombify = delta - idle_threads_to_kill;
  674. ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
  675. ast_taskprocessor_name(pool->tps));
  676. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  677. kill_threads, &idle_threads_to_kill);
  678. ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
  679. ast_taskprocessor_name(pool->tps));
  680. ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  681. zombify_threads, pool, &active_threads_to_zombify);
  682. }
  683. /*!
  684. * \brief Helper struct used for queued operations that change the size of the threadpool
  685. */
  686. struct set_size_data {
  687. /*! The pool whose size is to change */
  688. struct ast_threadpool *pool;
  689. /*! The requested new size of the pool */
  690. unsigned int size;
  691. };
  692. /*!
  693. * \brief Allocate and initialize a set_size_data
  694. * \param pool The pool for the set_size_data
  695. * \param size The size to store in the set_size_data
  696. */
  697. static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
  698. unsigned int size)
  699. {
  700. struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
  701. if (!ssd) {
  702. return NULL;
  703. }
  704. ssd->pool = pool;
  705. ssd->size = size;
  706. return ssd;
  707. }
  708. /*!
  709. * \brief Change the size of the threadpool
  710. *
  711. * This can either result in shrinking or growing the threadpool depending
  712. * on the new desired size and the current size.
  713. *
  714. * This function is run from the threadpool control taskprocessor thread
  715. *
  716. * \param data A set_size_data used for determining how to act
  717. * \return 0
  718. */
  719. static int queued_set_size(void *data)
  720. {
  721. RAII_VAR(struct set_size_data *, ssd, data, ao2_cleanup);
  722. struct ast_threadpool *pool = ssd->pool;
  723. unsigned int num_threads = ssd->size;
  724. /* We don't count zombie threads as being "live" when potentially resizing */
  725. unsigned int current_size = ao2_container_count(pool->active_threads) +
  726. ao2_container_count(pool->idle_threads);
  727. if (current_size == num_threads) {
  728. ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
  729. num_threads, current_size);
  730. return 0;
  731. }
  732. if (current_size < num_threads) {
  733. grow(pool, num_threads - current_size);
  734. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  735. activate_thread, pool);
  736. } else {
  737. shrink(pool, current_size - num_threads);
  738. }
  739. threadpool_send_state_changed(pool);
  740. return 0;
  741. }
  742. void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
  743. {
  744. struct set_size_data *ssd;
  745. SCOPED_AO2LOCK(lock, pool);
  746. if (pool->shutting_down) {
  747. return;
  748. }
  749. ssd = set_size_data_alloc(pool, size);
  750. if (!ssd) {
  751. return;
  752. }
  753. ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
  754. }
  755. struct ast_threadpool_listener *ast_threadpool_listener_alloc(
  756. const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
  757. {
  758. struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
  759. if (!listener) {
  760. return NULL;
  761. }
  762. listener->callbacks = callbacks;
  763. listener->user_data = user_data;
  764. return listener;
  765. }
  766. void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
  767. {
  768. return listener->user_data;
  769. }
  770. struct pool_options_pair {
  771. struct ast_threadpool *pool;
  772. struct ast_threadpool_options options;
  773. };
  774. struct ast_threadpool *ast_threadpool_create(const char *name,
  775. struct ast_threadpool_listener *listener,
  776. const struct ast_threadpool_options *options)
  777. {
  778. struct ast_taskprocessor *tps;
  779. RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
  780. RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
  781. pool = threadpool_alloc(name, options);
  782. if (!pool) {
  783. return NULL;
  784. }
  785. tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
  786. if (!tps_listener) {
  787. return NULL;
  788. }
  789. if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
  790. ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
  791. return NULL;
  792. }
  793. tps = ast_taskprocessor_create_with_listener(name, tps_listener);
  794. if (!tps) {
  795. return NULL;
  796. }
  797. pool->tps = tps;
  798. if (listener) {
  799. ao2_ref(listener, +1);
  800. pool->listener = listener;
  801. }
  802. ast_threadpool_set_size(pool, pool->options.initial_size);
  803. ao2_ref(pool, +1);
  804. return pool;
  805. }
  806. int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
  807. {
  808. SCOPED_AO2LOCK(lock, pool);
  809. if (!pool->shutting_down) {
  810. return ast_taskprocessor_push(pool->tps, task, data);
  811. }
  812. return -1;
  813. }
  814. void ast_threadpool_shutdown(struct ast_threadpool *pool)
  815. {
  816. if (!pool) {
  817. return;
  818. }
  819. /* Shut down the taskprocessors and everything else just
  820. * takes care of itself via the taskprocessor callbacks
  821. */
  822. ao2_lock(pool);
  823. pool->shutting_down = 1;
  824. ao2_unlock(pool);
  825. ast_taskprocessor_unreference(pool->control_tps);
  826. ast_taskprocessor_unreference(pool->tps);
  827. }
  828. /*!
  829. * A monotonically increasing integer used for worker
  830. * thread identification.
  831. */
  832. static int worker_id_counter;
  833. static int worker_thread_hash(const void *obj, int flags)
  834. {
  835. const struct worker_thread *worker = obj;
  836. return worker->id;
  837. }
  838. static int worker_thread_cmp(void *obj, void *arg, int flags)
  839. {
  840. struct worker_thread *worker1 = obj;
  841. struct worker_thread *worker2 = arg;
  842. return worker1->id == worker2->id ? CMP_MATCH : 0;
  843. }
  844. /*!
  845. * \brief shut a worker thread down
  846. *
  847. * Set the worker dead and then wait for its thread
  848. * to finish executing.
  849. *
  850. * \param worker The worker thread to shut down
  851. */
  852. static void worker_shutdown(struct worker_thread *worker)
  853. {
  854. worker_set_state(worker, DEAD);
  855. if (worker->thread != AST_PTHREADT_NULL) {
  856. pthread_join(worker->thread, NULL);
  857. worker->thread = AST_PTHREADT_NULL;
  858. }
  859. }
  860. /*!
  861. * \brief Worker thread destructor
  862. *
  863. * Called automatically when refcount reaches 0. Shuts
  864. * down the worker thread and destroys its component
  865. * parts
  866. */
  867. static void worker_thread_destroy(void *obj)
  868. {
  869. struct worker_thread *worker = obj;
  870. ast_debug(3, "Destroying worker thread %d\n", worker->id);
  871. worker_shutdown(worker);
  872. ast_mutex_destroy(&worker->lock);
  873. ast_cond_destroy(&worker->cond);
  874. }
  875. /*!
  876. * \brief start point for worker threads
  877. *
  878. * Worker threads start in the active state but may
  879. * immediately go idle if there is no work to be
  880. * done
  881. *
  882. * \param arg The worker thread
  883. * \retval NULL
  884. */
  885. static void *worker_start(void *arg)
  886. {
  887. struct worker_thread *worker = arg;
  888. if (worker->options.thread_start) {
  889. worker->options.thread_start();
  890. }
  891. ast_mutex_lock(&worker->lock);
  892. while (worker_idle(worker)) {
  893. ast_mutex_unlock(&worker->lock);
  894. worker_active(worker);
  895. ast_mutex_lock(&worker->lock);
  896. if (worker->state != ALIVE) {
  897. break;
  898. }
  899. threadpool_active_thread_idle(worker->pool, worker);
  900. }
  901. ast_mutex_unlock(&worker->lock);
  902. /* Reaching this portion means the thread is
  903. * on death's door. It may have been killed while
  904. * it was idle, in which case it can just die
  905. * peacefully. If it's a zombie, though, then
  906. * it needs to let the pool know so
  907. * that the thread can be removed from the
  908. * list of zombie threads.
  909. */
  910. if (worker->state == ZOMBIE) {
  911. threadpool_zombie_thread_dead(worker->pool, worker);
  912. }
  913. if (worker->options.thread_end) {
  914. worker->options.thread_end();
  915. }
  916. return NULL;
  917. }
  918. /*!
  919. * \brief Allocate and initialize a new worker thread
  920. *
  921. * This will create, initialize, and start the thread.
  922. *
  923. * \param pool The threadpool to which the worker will be added
  924. * \retval NULL Failed to allocate or start the worker thread
  925. * \retval non-NULL The newly-created worker thread
  926. */
  927. static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
  928. {
  929. struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
  930. if (!worker) {
  931. return NULL;
  932. }
  933. worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
  934. ast_mutex_init(&worker->lock);
  935. ast_cond_init(&worker->cond, NULL);
  936. worker->pool = pool;
  937. worker->thread = AST_PTHREADT_NULL;
  938. worker->state = ALIVE;
  939. worker->options = pool->options;
  940. return worker;
  941. }
  942. static int worker_thread_start(struct worker_thread *worker)
  943. {
  944. return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
  945. }
  946. /*!
  947. * \brief Active loop for worker threads
  948. *
  949. * The worker will stay in this loop for its lifetime,
  950. * executing tasks as they become available. If there
  951. * are no tasks currently available, then the thread
  952. * will go idle.
  953. *
  954. * \param worker The worker thread executing tasks.
  955. */
  956. static void worker_active(struct worker_thread *worker)
  957. {
  958. int alive;
  959. /* The following is equivalent to
  960. *
  961. * while (threadpool_execute(worker->pool));
  962. *
  963. * However, reviewers have suggested in the past
  964. * doing that can cause optimizers to (wrongly)
  965. * optimize the code away.
  966. */
  967. do {
  968. alive = threadpool_execute(worker->pool);
  969. } while (alive);
  970. }
  971. /*!
  972. * \brief Idle function for worker threads
  973. *
  974. * The worker waits here until it gets told by the threadpool
  975. * to wake up.
  976. *
  977. * worker is locked before entering this function.
  978. *
  979. * \param worker The idle worker
  980. * \retval 0 The thread is being woken up so that it can conclude.
  981. * \retval non-zero The thread is being woken up to do more work.
  982. */
  983. static int worker_idle(struct worker_thread *worker)
  984. {
  985. struct timeval start = ast_tvnow();
  986. struct timespec end = {
  987. .tv_sec = start.tv_sec + worker->options.idle_timeout,
  988. .tv_nsec = start.tv_usec * 1000,
  989. };
  990. while (!worker->wake_up) {
  991. if (worker->options.idle_timeout <= 0) {
  992. ast_cond_wait(&worker->cond, &worker->lock);
  993. } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
  994. break;
  995. }
  996. }
  997. if (!worker->wake_up) {
  998. ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
  999. threadpool_idle_thread_dead(worker->pool, worker);
  1000. worker->state = DEAD;
  1001. }
  1002. worker->wake_up = 0;
  1003. return worker->state == ALIVE;
  1004. }
  1005. /*!
  1006. * \brief Change a worker's state
  1007. *
  1008. * The threadpool calls into this function in order to let a worker know
  1009. * how it should proceed.
  1010. */
  1011. static void worker_set_state(struct worker_thread *worker, enum worker_state state)
  1012. {
  1013. SCOPED_MUTEX(lock, &worker->lock);
  1014. worker->state = state;
  1015. worker->wake_up = 1;
  1016. ast_cond_signal(&worker->cond);
  1017. }
  1018. struct serializer {
  1019. struct ast_threadpool *pool;
  1020. };
  1021. static void serializer_dtor(void *obj)
  1022. {
  1023. struct serializer *ser = obj;
  1024. ao2_cleanup(ser->pool);
  1025. ser->pool = NULL;
  1026. }
  1027. static struct serializer *serializer_create(struct ast_threadpool *pool)
  1028. {
  1029. struct serializer *ser;
  1030. ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  1031. if (!ser) {
  1032. return NULL;
  1033. }
  1034. ao2_ref(pool, +1);
  1035. ser->pool = pool;
  1036. return ser;
  1037. }
  1038. static int execute_tasks(void *data)
  1039. {
  1040. struct ast_taskprocessor *tps = data;
  1041. while (ast_taskprocessor_execute(tps)) {
  1042. /* No-op */
  1043. }
  1044. ast_taskprocessor_unreference(tps);
  1045. return 0;
  1046. }
  1047. static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  1048. {
  1049. if (was_empty) {
  1050. struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
  1051. struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
  1052. if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
  1053. ast_taskprocessor_unreference(tps);
  1054. }
  1055. }
  1056. }
  1057. static int serializer_start(struct ast_taskprocessor_listener *listener)
  1058. {
  1059. /* No-op */
  1060. return 0;
  1061. }
  1062. static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
  1063. {
  1064. struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
  1065. ao2_cleanup(ser);
  1066. }
  1067. static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
  1068. .task_pushed = serializer_task_pushed,
  1069. .start = serializer_start,
  1070. .shutdown = serializer_shutdown,
  1071. };
  1072. struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
  1073. {
  1074. RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
  1075. RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
  1076. struct ast_taskprocessor *tps = NULL;
  1077. ser = serializer_create(pool);
  1078. if (!ser) {
  1079. return NULL;
  1080. }
  1081. listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
  1082. if (!listener) {
  1083. return NULL;
  1084. }
  1085. ser = NULL; /* ownership transferred to listener */
  1086. tps = ast_taskprocessor_create_with_listener(name, listener);
  1087. if (!tps) {
  1088. return NULL;
  1089. }
  1090. return tps;
  1091. }