taskprocessor.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2007-2013, Digium, Inc.
  5. *
  6. * Dwayne M. Hubbard <dhubbard@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
  21. *
  22. * \author Dwayne Hubbard <dhubbard@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/_private.h"
  30. #include "asterisk/module.h"
  31. #include "asterisk/time.h"
  32. #include "asterisk/astobj2.h"
  33. #include "asterisk/cli.h"
  34. #include "asterisk/taskprocessor.h"
  35. #include "asterisk/sem.h"
  36. /*!
  37. * \brief tps_task structure is queued to a taskprocessor
  38. *
  39. * tps_tasks are processed in FIFO order and freed by the taskprocessing
  40. * thread after the task handler returns. The callback function that is assigned
  41. * to the execute() function pointer is responsible for releasing datap resources if necessary.
  42. */
  43. struct tps_task {
  44. /*! \brief The execute() task callback function pointer */
  45. union {
  46. int (*execute)(void *datap);
  47. int (*execute_local)(struct ast_taskprocessor_local *local);
  48. } callback;
  49. /*! \brief The data pointer for the task execute() function */
  50. void *datap;
  51. /*! \brief AST_LIST_ENTRY overhead */
  52. AST_LIST_ENTRY(tps_task) list;
  53. unsigned int wants_local:1;
  54. };
  55. /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
  56. struct tps_taskprocessor_stats {
  57. /*! \brief This is the maximum number of tasks queued at any one time */
  58. unsigned long max_qsize;
  59. /*! \brief This is the current number of tasks processed */
  60. unsigned long _tasks_processed_count;
  61. };
  62. /*! \brief A ast_taskprocessor structure is a singleton by name */
  63. struct ast_taskprocessor {
  64. /*! \brief Friendly name of the taskprocessor */
  65. const char *name;
  66. /*! \brief Taskprocessor statistics */
  67. struct tps_taskprocessor_stats *stats;
  68. void *local_data;
  69. /*! \brief Taskprocessor current queue size */
  70. long tps_queue_size;
  71. /*! \brief Taskprocessor queue */
  72. AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
  73. struct ast_taskprocessor_listener *listener;
  74. /*! Current thread executing the tasks */
  75. pthread_t thread;
  76. /*! Indicates if the taskprocessor is currently executing a task */
  77. unsigned int executing:1;
  78. };
  79. /*!
  80. * \brief A listener for taskprocessors
  81. *
  82. * \since 12.0.0
  83. *
  84. * When a taskprocessor's state changes, the listener
  85. * is notified of the change. This allows for tasks
  86. * to be addressed in whatever way is appropriate for
  87. * the module using the taskprocessor.
  88. */
  89. struct ast_taskprocessor_listener {
  90. /*! The callbacks the taskprocessor calls into to notify of state changes */
  91. const struct ast_taskprocessor_listener_callbacks *callbacks;
  92. /*! The taskprocessor that the listener is listening to */
  93. struct ast_taskprocessor *tps;
  94. /*! Data private to the listener */
  95. void *user_data;
  96. };
  97. #define TPS_MAX_BUCKETS 7
  98. /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
  99. static struct ao2_container *tps_singletons;
  100. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
  101. static ast_cond_t cli_ping_cond;
  102. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
  103. AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
  104. /*! \brief The astobj2 hash callback for taskprocessors */
  105. static int tps_hash_cb(const void *obj, const int flags);
  106. /*! \brief The astobj2 compare callback for taskprocessors */
  107. static int tps_cmp_cb(void *obj, void *arg, int flags);
  108. /*! \brief Destroy the taskprocessor when its refcount reaches zero */
  109. static void tps_taskprocessor_destroy(void *tps);
  110. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
  111. static int tps_ping_handler(void *datap);
  112. /*! \brief Remove the front task off the taskprocessor queue */
  113. static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
  114. /*! \brief Return the size of the taskprocessor queue */
  115. static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
  116. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  117. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  118. static struct ast_cli_entry taskprocessor_clis[] = {
  119. AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
  120. AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
  121. };
  122. struct default_taskprocessor_listener_pvt {
  123. pthread_t poll_thread;
  124. int dead;
  125. struct ast_sem sem;
  126. };
  127. static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
  128. {
  129. ast_assert(pvt->dead);
  130. ast_sem_destroy(&pvt->sem);
  131. ast_free(pvt);
  132. }
  133. static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
  134. {
  135. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  136. default_listener_pvt_destroy(pvt);
  137. listener->user_data = NULL;
  138. }
  139. /*!
  140. * \brief Function that processes tasks in the taskprocessor
  141. * \internal
  142. */
  143. static void *default_tps_processing_function(void *data)
  144. {
  145. struct ast_taskprocessor_listener *listener = data;
  146. struct ast_taskprocessor *tps = listener->tps;
  147. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  148. int sem_value;
  149. int res;
  150. while (!pvt->dead) {
  151. res = ast_sem_wait(&pvt->sem);
  152. if (res != 0 && errno != EINTR) {
  153. ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
  154. strerror(errno));
  155. /* Just give up */
  156. break;
  157. }
  158. ast_taskprocessor_execute(tps);
  159. }
  160. /* No posting to a dead taskprocessor! */
  161. res = ast_sem_getvalue(&pvt->sem, &sem_value);
  162. ast_assert(res == 0 && sem_value == 0);
  163. /* Free the shutdown reference (see default_listener_shutdown) */
  164. ao2_t_ref(listener->tps, -1, "tps-shutdown");
  165. return NULL;
  166. }
  167. static int default_listener_start(struct ast_taskprocessor_listener *listener)
  168. {
  169. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  170. if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
  171. return -1;
  172. }
  173. return 0;
  174. }
  175. static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  176. {
  177. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  178. if (ast_sem_post(&pvt->sem) != 0) {
  179. ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
  180. strerror(errno));
  181. }
  182. }
  183. static int default_listener_die(void *data)
  184. {
  185. struct default_taskprocessor_listener_pvt *pvt = data;
  186. pvt->dead = 1;
  187. return 0;
  188. }
  189. static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
  190. {
  191. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  192. int res;
  193. /* Hold a reference during shutdown */
  194. ao2_t_ref(listener->tps, +1, "tps-shutdown");
  195. ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
  196. ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
  197. if (pthread_equal(pthread_self(), pvt->poll_thread)) {
  198. res = pthread_detach(pvt->poll_thread);
  199. if (res != 0) {
  200. ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
  201. }
  202. } else {
  203. res = pthread_join(pvt->poll_thread, NULL);
  204. if (res != 0) {
  205. ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
  206. }
  207. }
  208. pvt->poll_thread = AST_PTHREADT_NULL;
  209. }
  210. static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
  211. .start = default_listener_start,
  212. .task_pushed = default_task_pushed,
  213. .shutdown = default_listener_shutdown,
  214. .dtor = default_listener_pvt_dtor,
  215. };
  216. /*!
  217. * \internal
  218. * \brief Clean up resources on Asterisk shutdown
  219. */
  220. static void tps_shutdown(void)
  221. {
  222. ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  223. ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
  224. tps_singletons = NULL;
  225. }
  226. /* initialize the taskprocessor container and register CLI operations */
  227. int ast_tps_init(void)
  228. {
  229. if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
  230. ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
  231. return -1;
  232. }
  233. ast_cond_init(&cli_ping_cond, NULL);
  234. ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  235. ast_register_atexit(tps_shutdown);
  236. return 0;
  237. }
  238. /* allocate resources for the task */
  239. static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
  240. {
  241. struct tps_task *t;
  242. if (!task_exe) {
  243. ast_log(LOG_ERROR, "task_exe is NULL!\n");
  244. return NULL;
  245. }
  246. t = ast_calloc(1, sizeof(*t));
  247. if (!t) {
  248. ast_log(LOG_ERROR, "failed to allocate task!\n");
  249. return NULL;
  250. }
  251. t->callback.execute = task_exe;
  252. t->datap = datap;
  253. return t;
  254. }
  255. static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
  256. {
  257. struct tps_task *t;
  258. if (!task_exe) {
  259. ast_log(LOG_ERROR, "task_exe is NULL!\n");
  260. return NULL;
  261. }
  262. t = ast_calloc(1, sizeof(*t));
  263. if (!t) {
  264. ast_log(LOG_ERROR, "failed to allocate task!\n");
  265. return NULL;
  266. }
  267. t->callback.execute_local = task_exe;
  268. t->datap = datap;
  269. t->wants_local = 1;
  270. return t;
  271. }
  272. /* release task resources */
  273. static void *tps_task_free(struct tps_task *task)
  274. {
  275. ast_free(task);
  276. return NULL;
  277. }
  278. /* taskprocessor tab completion */
  279. static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
  280. {
  281. int tklen;
  282. int wordnum = 0;
  283. char *name = NULL;
  284. struct ao2_iterator i;
  285. if (a->pos != 3)
  286. return NULL;
  287. tklen = strlen(a->word);
  288. i = ao2_iterator_init(tps_singletons, 0);
  289. while ((p = ao2_iterator_next(&i))) {
  290. if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
  291. name = ast_strdup(p->name);
  292. ao2_ref(p, -1);
  293. break;
  294. }
  295. ao2_ref(p, -1);
  296. }
  297. ao2_iterator_destroy(&i);
  298. return name;
  299. }
  300. /* ping task handling function */
  301. static int tps_ping_handler(void *datap)
  302. {
  303. ast_mutex_lock(&cli_ping_cond_lock);
  304. ast_cond_signal(&cli_ping_cond);
  305. ast_mutex_unlock(&cli_ping_cond_lock);
  306. return 0;
  307. }
  308. /* ping the specified taskprocessor and display the ping time on the CLI */
  309. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  310. {
  311. struct timeval begin, end, delta;
  312. const char *name;
  313. struct timeval when;
  314. struct timespec ts;
  315. struct ast_taskprocessor *tps = NULL;
  316. switch (cmd) {
  317. case CLI_INIT:
  318. e->command = "core ping taskprocessor";
  319. e->usage =
  320. "Usage: core ping taskprocessor <taskprocessor>\n"
  321. " Displays the time required for a task to be processed\n";
  322. return NULL;
  323. case CLI_GENERATE:
  324. return tps_taskprocessor_tab_complete(tps, a);
  325. }
  326. if (a->argc != 4)
  327. return CLI_SHOWUSAGE;
  328. name = a->argv[3];
  329. if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
  330. ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
  331. return CLI_SUCCESS;
  332. }
  333. ast_cli(a->fd, "\npinging %s ...", name);
  334. when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
  335. ts.tv_sec = when.tv_sec;
  336. ts.tv_nsec = when.tv_usec * 1000;
  337. ast_mutex_lock(&cli_ping_cond_lock);
  338. if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
  339. ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
  340. ao2_ref(tps, -1);
  341. return CLI_FAILURE;
  342. }
  343. ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
  344. ast_mutex_unlock(&cli_ping_cond_lock);
  345. end = ast_tvnow();
  346. delta = ast_tvsub(end, begin);
  347. ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
  348. ao2_ref(tps, -1);
  349. return CLI_SUCCESS;
  350. }
  351. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  352. {
  353. char name[256];
  354. int tcount;
  355. unsigned long qsize;
  356. unsigned long maxqsize;
  357. unsigned long processed;
  358. struct ast_taskprocessor *p;
  359. struct ao2_iterator i;
  360. switch (cmd) {
  361. case CLI_INIT:
  362. e->command = "core show taskprocessors";
  363. e->usage =
  364. "Usage: core show taskprocessors\n"
  365. " Shows a list of instantiated task processors and their statistics\n";
  366. return NULL;
  367. case CLI_GENERATE:
  368. return NULL;
  369. }
  370. if (a->argc != e->args)
  371. return CLI_SHOWUSAGE;
  372. ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
  373. i = ao2_iterator_init(tps_singletons, 0);
  374. while ((p = ao2_iterator_next(&i))) {
  375. ast_copy_string(name, p->name, sizeof(name));
  376. qsize = p->tps_queue_size;
  377. maxqsize = p->stats->max_qsize;
  378. processed = p->stats->_tasks_processed_count;
  379. ast_cli(a->fd, "\n%24s %17lu %12lu %12lu", name, processed, qsize, maxqsize);
  380. ao2_ref(p, -1);
  381. }
  382. ao2_iterator_destroy(&i);
  383. tcount = ao2_container_count(tps_singletons);
  384. ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
  385. return CLI_SUCCESS;
  386. }
  387. /* hash callback for astobj2 */
  388. static int tps_hash_cb(const void *obj, const int flags)
  389. {
  390. const struct ast_taskprocessor *tps = obj;
  391. const char *name = flags & OBJ_KEY ? obj : tps->name;
  392. return ast_str_case_hash(name);
  393. }
  394. /* compare callback for astobj2 */
  395. static int tps_cmp_cb(void *obj, void *arg, int flags)
  396. {
  397. struct ast_taskprocessor *lhs = obj, *rhs = arg;
  398. const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
  399. return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
  400. }
  401. /* destroy the taskprocessor */
  402. static void tps_taskprocessor_destroy(void *tps)
  403. {
  404. struct ast_taskprocessor *t = tps;
  405. struct tps_task *task;
  406. if (!tps) {
  407. ast_log(LOG_ERROR, "missing taskprocessor\n");
  408. return;
  409. }
  410. ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
  411. /* free it */
  412. ast_free(t->stats);
  413. t->stats = NULL;
  414. ast_free((char *) t->name);
  415. if (t->listener) {
  416. ao2_ref(t->listener, -1);
  417. t->listener = NULL;
  418. }
  419. while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
  420. tps_task_free(task);
  421. }
  422. }
  423. /* pop the front task and return it */
  424. static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
  425. {
  426. struct tps_task *task;
  427. if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
  428. tps->tps_queue_size--;
  429. }
  430. return task;
  431. }
  432. static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
  433. {
  434. return (tps) ? tps->tps_queue_size : -1;
  435. }
  436. /* taskprocessor name accessor */
  437. const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
  438. {
  439. if (!tps) {
  440. ast_log(LOG_ERROR, "no taskprocessor specified!\n");
  441. return NULL;
  442. }
  443. return tps->name;
  444. }
  445. static void listener_shutdown(struct ast_taskprocessor_listener *listener)
  446. {
  447. listener->callbacks->shutdown(listener);
  448. ao2_ref(listener->tps, -1);
  449. }
  450. static void taskprocessor_listener_dtor(void *obj)
  451. {
  452. struct ast_taskprocessor_listener *listener = obj;
  453. if (listener->callbacks->dtor) {
  454. listener->callbacks->dtor(listener);
  455. }
  456. }
  457. struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
  458. {
  459. struct ast_taskprocessor_listener *listener;
  460. listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
  461. if (!listener) {
  462. return NULL;
  463. }
  464. listener->callbacks = callbacks;
  465. listener->user_data = user_data;
  466. return listener;
  467. }
  468. struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
  469. {
  470. ao2_ref(listener->tps, +1);
  471. return listener->tps;
  472. }
  473. void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
  474. {
  475. return listener->user_data;
  476. }
  477. static void *default_listener_pvt_alloc(void)
  478. {
  479. struct default_taskprocessor_listener_pvt *pvt;
  480. pvt = ast_calloc(1, sizeof(*pvt));
  481. if (!pvt) {
  482. return NULL;
  483. }
  484. pvt->poll_thread = AST_PTHREADT_NULL;
  485. if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
  486. ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
  487. ast_free(pvt);
  488. return NULL;
  489. }
  490. return pvt;
  491. }
  492. static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
  493. {
  494. RAII_VAR(struct ast_taskprocessor *, p,
  495. ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
  496. if (!p) {
  497. ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
  498. return NULL;
  499. }
  500. if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
  501. ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
  502. return NULL;
  503. }
  504. if (!(p->name = ast_strdup(name))) {
  505. ao2_ref(p, -1);
  506. return NULL;
  507. }
  508. ao2_ref(listener, +1);
  509. p->listener = listener;
  510. p->thread = AST_PTHREADT_NULL;
  511. ao2_ref(p, +1);
  512. listener->tps = p;
  513. if (!(ao2_link(tps_singletons, p))) {
  514. ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
  515. return NULL;
  516. }
  517. if (p->listener->callbacks->start(p->listener)) {
  518. ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
  519. ast_taskprocessor_unreference(p);
  520. return NULL;
  521. }
  522. /* RAII_VAR will decrement the refcount at the end of the function.
  523. * Since we want to pass back a reference to p, we bump the refcount
  524. */
  525. ao2_ref(p, +1);
  526. return p;
  527. }
  528. /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
  529. * create the taskprocessor if we were told via ast_tps_options to return a reference only
  530. * if it already exists */
  531. struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
  532. {
  533. struct ast_taskprocessor *p;
  534. struct ast_taskprocessor_listener *listener;
  535. struct default_taskprocessor_listener_pvt *pvt;
  536. if (ast_strlen_zero(name)) {
  537. ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
  538. return NULL;
  539. }
  540. p = ao2_find(tps_singletons, name, OBJ_KEY);
  541. if (p) {
  542. return p;
  543. }
  544. if (create & TPS_REF_IF_EXISTS) {
  545. /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
  546. return NULL;
  547. }
  548. /* Create a new taskprocessor. Start by creating a default listener */
  549. pvt = default_listener_pvt_alloc();
  550. if (!pvt) {
  551. return NULL;
  552. }
  553. listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
  554. if (!listener) {
  555. default_listener_pvt_destroy(pvt);
  556. return NULL;
  557. }
  558. p = __allocate_taskprocessor(name, listener);
  559. if (!p) {
  560. ao2_ref(listener, -1);
  561. return NULL;
  562. }
  563. /* Unref listener here since the taskprocessor has gained a reference to the listener */
  564. ao2_ref(listener, -1);
  565. return p;
  566. }
  567. struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
  568. {
  569. struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
  570. if (p) {
  571. ast_taskprocessor_unreference(p);
  572. return NULL;
  573. }
  574. return __allocate_taskprocessor(name, listener);
  575. }
  576. void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
  577. void *local_data)
  578. {
  579. SCOPED_AO2LOCK(lock, tps);
  580. tps->local_data = local_data;
  581. }
  582. /* decrement the taskprocessor reference count and unlink from the container if necessary */
  583. void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
  584. {
  585. if (!tps) {
  586. return NULL;
  587. }
  588. if (ao2_ref(tps, -1) > 3) {
  589. return NULL;
  590. }
  591. /* If we're down to 3 references, then those must be:
  592. * 1. The reference we just got rid of
  593. * 2. The container
  594. * 3. The listener
  595. */
  596. ao2_unlink(tps_singletons, tps);
  597. listener_shutdown(tps->listener);
  598. return NULL;
  599. }
  600. /* push the task into the taskprocessor queue */
  601. static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
  602. {
  603. int previous_size;
  604. int was_empty;
  605. if (!tps) {
  606. ast_log(LOG_ERROR, "tps is NULL!\n");
  607. return -1;
  608. }
  609. if (!t) {
  610. ast_log(LOG_ERROR, "t is NULL!\n");
  611. return -1;
  612. }
  613. ao2_lock(tps);
  614. AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
  615. previous_size = tps->tps_queue_size++;
  616. /* The currently executing task counts as still in queue */
  617. was_empty = tps->executing ? 0 : previous_size == 0;
  618. ao2_unlock(tps);
  619. tps->listener->callbacks->task_pushed(tps->listener, was_empty);
  620. return 0;
  621. }
  622. int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
  623. {
  624. return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
  625. }
  626. int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
  627. {
  628. return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
  629. }
  630. int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
  631. {
  632. struct ast_taskprocessor_local local;
  633. struct tps_task *t;
  634. int size;
  635. ao2_lock(tps);
  636. t = tps_taskprocessor_pop(tps);
  637. if (!t) {
  638. ao2_unlock(tps);
  639. return 0;
  640. }
  641. tps->thread = pthread_self();
  642. tps->executing = 1;
  643. if (t->wants_local) {
  644. local.local_data = tps->local_data;
  645. local.data = t->datap;
  646. }
  647. ao2_unlock(tps);
  648. if (t->wants_local) {
  649. t->callback.execute_local(&local);
  650. } else {
  651. t->callback.execute(t->datap);
  652. }
  653. tps_task_free(t);
  654. ao2_lock(tps);
  655. tps->thread = AST_PTHREADT_NULL;
  656. /* We need to check size in the same critical section where we reset the
  657. * executing bit. Avoids a race condition where a task is pushed right
  658. * after we pop an empty stack.
  659. */
  660. tps->executing = 0;
  661. size = tps_taskprocessor_depth(tps);
  662. /* If we executed a task, bump the stats */
  663. if (tps->stats) {
  664. tps->stats->_tasks_processed_count++;
  665. if (size > tps->stats->max_qsize) {
  666. tps->stats->max_qsize = size;
  667. }
  668. }
  669. ao2_unlock(tps);
  670. /* If we executed a task, check for the transition to empty */
  671. if (size == 0 && tps->listener->callbacks->emptied) {
  672. tps->listener->callbacks->emptied(tps->listener);
  673. }
  674. return size > 0;
  675. }
  676. int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
  677. {
  678. int is_task;
  679. ao2_lock(tps);
  680. is_task = pthread_equal(tps->thread, pthread_self());
  681. ao2_unlock(tps);
  682. return is_task;
  683. }