taskprocessor.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2007-2008, Digium, Inc.
  5. *
  6. * Dwayne M. Hubbard <dhubbard@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
  21. *
  22. * \author Dwayne Hubbard <dhubbard@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/_private.h"
  30. #include "asterisk/module.h"
  31. #include "asterisk/time.h"
  32. #include "asterisk/astobj2.h"
  33. #include "asterisk/cli.h"
  34. #include "asterisk/taskprocessor.h"
  35. /*!
  36. * \brief tps_task structure is queued to a taskprocessor
  37. *
  38. * tps_tasks are processed in FIFO order and freed by the taskprocessing
  39. * thread after the task handler returns. The callback function that is assigned
  40. * to the execute() function pointer is responsible for releasing datap resources if necessary.
  41. */
  42. struct tps_task {
  43. /*! \brief The execute() task callback function pointer */
  44. int (*execute)(void *datap);
  45. /*! \brief The data pointer for the task execute() function */
  46. void *datap;
  47. /*! \brief AST_LIST_ENTRY overhead */
  48. AST_LIST_ENTRY(tps_task) list;
  49. };
  50. /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
  51. struct tps_taskprocessor_stats {
  52. /*! \brief This is the maximum number of tasks queued at any one time */
  53. unsigned long max_qsize;
  54. /*! \brief This is the current number of tasks processed */
  55. unsigned long _tasks_processed_count;
  56. };
  57. /*! \brief A ast_taskprocessor structure is a singleton by name */
  58. struct ast_taskprocessor {
  59. /*! \brief Friendly name of the taskprocessor */
  60. const char *name;
  61. /*! \brief Thread poll condition */
  62. ast_cond_t poll_cond;
  63. /*! \brief Taskprocessor thread */
  64. pthread_t poll_thread;
  65. /*! \brief Taskprocessor lock */
  66. ast_mutex_t taskprocessor_lock;
  67. /*! \brief Taskprocesor thread run flag */
  68. unsigned char poll_thread_run;
  69. /*! \brief Taskprocessor statistics */
  70. struct tps_taskprocessor_stats *stats;
  71. /*! \brief Taskprocessor current queue size */
  72. long tps_queue_size;
  73. /*! \brief Taskprocessor queue */
  74. AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
  75. /*! \brief Taskprocessor singleton list entry */
  76. AST_LIST_ENTRY(ast_taskprocessor) list;
  77. };
  78. #define TPS_MAX_BUCKETS 7
  79. /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
  80. static struct ao2_container *tps_singletons;
  81. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
  82. static ast_cond_t cli_ping_cond;
  83. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
  84. AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
  85. /*! \brief The astobj2 hash callback for taskprocessors */
  86. static int tps_hash_cb(const void *obj, const int flags);
  87. /*! \brief The astobj2 compare callback for taskprocessors */
  88. static int tps_cmp_cb(void *obj, void *arg, int flags);
  89. /*! \brief The task processing function executed by a taskprocessor */
  90. static void *tps_processing_function(void *data);
  91. /*! \brief Destroy the taskprocessor when its refcount reaches zero */
  92. static void tps_taskprocessor_destroy(void *tps);
  93. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
  94. static int tps_ping_handler(void *datap);
  95. /*! \brief Remove the front task off the taskprocessor queue */
  96. static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
  97. /*! \brief Return the size of the taskprocessor queue */
  98. static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
  99. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  100. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  101. static struct ast_cli_entry taskprocessor_clis[] = {
  102. AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
  103. AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
  104. };
  105. /*!
  106. * \internal
  107. * \brief Clean up resources on Asterisk shutdown
  108. */
  109. static void tps_shutdown(void)
  110. {
  111. ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  112. ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
  113. tps_singletons = NULL;
  114. }
  115. /* initialize the taskprocessor container and register CLI operations */
  116. int ast_tps_init(void)
  117. {
  118. if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
  119. ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
  120. return -1;
  121. }
  122. ast_cond_init(&cli_ping_cond, NULL);
  123. ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  124. ast_register_cleanup(tps_shutdown);
  125. return 0;
  126. }
  127. /* allocate resources for the task */
  128. static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
  129. {
  130. struct tps_task *t;
  131. if ((t = ast_calloc(1, sizeof(*t)))) {
  132. t->execute = task_exe;
  133. t->datap = datap;
  134. }
  135. return t;
  136. }
  137. /* release task resources */
  138. static void *tps_task_free(struct tps_task *task)
  139. {
  140. if (task) {
  141. ast_free(task);
  142. }
  143. return NULL;
  144. }
  145. /* taskprocessor tab completion */
  146. static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
  147. {
  148. int tklen;
  149. int wordnum = 0;
  150. char *name = NULL;
  151. struct ao2_iterator i;
  152. if (a->pos != 3)
  153. return NULL;
  154. tklen = strlen(a->word);
  155. i = ao2_iterator_init(tps_singletons, 0);
  156. while ((p = ao2_iterator_next(&i))) {
  157. if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
  158. name = ast_strdup(p->name);
  159. ao2_ref(p, -1);
  160. break;
  161. }
  162. ao2_ref(p, -1);
  163. }
  164. ao2_iterator_destroy(&i);
  165. return name;
  166. }
  167. /* ping task handling function */
  168. static int tps_ping_handler(void *datap)
  169. {
  170. ast_mutex_lock(&cli_ping_cond_lock);
  171. ast_cond_signal(&cli_ping_cond);
  172. ast_mutex_unlock(&cli_ping_cond_lock);
  173. return 0;
  174. }
  175. /* ping the specified taskprocessor and display the ping time on the CLI */
  176. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  177. {
  178. struct timeval begin, end, delta;
  179. const char *name;
  180. struct timeval when;
  181. struct timespec ts;
  182. struct ast_taskprocessor *tps = NULL;
  183. switch (cmd) {
  184. case CLI_INIT:
  185. e->command = "core ping taskprocessor";
  186. e->usage =
  187. "Usage: core ping taskprocessor <taskprocessor>\n"
  188. " Displays the time required for a task to be processed\n";
  189. return NULL;
  190. case CLI_GENERATE:
  191. return tps_taskprocessor_tab_complete(tps, a);
  192. }
  193. if (a->argc != 4)
  194. return CLI_SHOWUSAGE;
  195. name = a->argv[3];
  196. if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
  197. ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
  198. return CLI_SUCCESS;
  199. }
  200. ast_cli(a->fd, "\npinging %s ...", name);
  201. when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
  202. ts.tv_sec = when.tv_sec;
  203. ts.tv_nsec = when.tv_usec * 1000;
  204. ast_mutex_lock(&cli_ping_cond_lock);
  205. if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
  206. ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
  207. ao2_ref(tps, -1);
  208. return CLI_FAILURE;
  209. }
  210. ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
  211. ast_mutex_unlock(&cli_ping_cond_lock);
  212. end = ast_tvnow();
  213. delta = ast_tvsub(end, begin);
  214. ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
  215. ao2_ref(tps, -1);
  216. return CLI_SUCCESS;
  217. }
  218. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  219. {
  220. char name[256];
  221. int tcount;
  222. unsigned long qsize;
  223. unsigned long maxqsize;
  224. unsigned long processed;
  225. struct ast_taskprocessor *p;
  226. struct ao2_iterator i;
  227. switch (cmd) {
  228. case CLI_INIT:
  229. e->command = "core show taskprocessors";
  230. e->usage =
  231. "Usage: core show taskprocessors\n"
  232. " Shows a list of instantiated task processors and their statistics\n";
  233. return NULL;
  234. case CLI_GENERATE:
  235. return NULL;
  236. }
  237. if (a->argc != e->args)
  238. return CLI_SHOWUSAGE;
  239. ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
  240. i = ao2_iterator_init(tps_singletons, 0);
  241. while ((p = ao2_iterator_next(&i))) {
  242. ast_copy_string(name, p->name, sizeof(name));
  243. qsize = p->tps_queue_size;
  244. maxqsize = p->stats->max_qsize;
  245. processed = p->stats->_tasks_processed_count;
  246. ast_cli(a->fd, "\n%24s %17lu %12lu %12lu", name, processed, qsize, maxqsize);
  247. ao2_ref(p, -1);
  248. }
  249. ao2_iterator_destroy(&i);
  250. tcount = ao2_container_count(tps_singletons);
  251. ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
  252. return CLI_SUCCESS;
  253. }
  254. /* this is the task processing worker function */
  255. static void *tps_processing_function(void *data)
  256. {
  257. struct ast_taskprocessor *i = data;
  258. struct tps_task *t;
  259. int size;
  260. if (!i) {
  261. ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
  262. return NULL;
  263. }
  264. while (i->poll_thread_run) {
  265. ast_mutex_lock(&i->taskprocessor_lock);
  266. if (!i->poll_thread_run) {
  267. ast_mutex_unlock(&i->taskprocessor_lock);
  268. break;
  269. }
  270. if (!(size = tps_taskprocessor_depth(i))) {
  271. ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
  272. if (!i->poll_thread_run) {
  273. ast_mutex_unlock(&i->taskprocessor_lock);
  274. break;
  275. }
  276. }
  277. ast_mutex_unlock(&i->taskprocessor_lock);
  278. /* stuff is in the queue */
  279. if (!(t = tps_taskprocessor_pop(i))) {
  280. ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
  281. continue;
  282. }
  283. if (!t->execute) {
  284. ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
  285. tps_task_free(t);
  286. continue;
  287. }
  288. t->execute(t->datap);
  289. ast_mutex_lock(&i->taskprocessor_lock);
  290. if (i->stats) {
  291. i->stats->_tasks_processed_count++;
  292. if (size > i->stats->max_qsize) {
  293. i->stats->max_qsize = size;
  294. }
  295. }
  296. ast_mutex_unlock(&i->taskprocessor_lock);
  297. tps_task_free(t);
  298. }
  299. while ((t = tps_taskprocessor_pop(i))) {
  300. tps_task_free(t);
  301. }
  302. return NULL;
  303. }
  304. /* hash callback for astobj2 */
  305. static int tps_hash_cb(const void *obj, const int flags)
  306. {
  307. const struct ast_taskprocessor *tps = obj;
  308. return ast_str_case_hash(tps->name);
  309. }
  310. /* compare callback for astobj2 */
  311. static int tps_cmp_cb(void *obj, void *arg, int flags)
  312. {
  313. struct ast_taskprocessor *lhs = obj, *rhs = arg;
  314. return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
  315. }
  316. /* destroy the taskprocessor */
  317. static void tps_taskprocessor_destroy(void *tps)
  318. {
  319. struct ast_taskprocessor *t = tps;
  320. if (!tps) {
  321. ast_log(LOG_ERROR, "missing taskprocessor\n");
  322. return;
  323. }
  324. ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
  325. /* kill it */
  326. ast_mutex_lock(&t->taskprocessor_lock);
  327. t->poll_thread_run = 0;
  328. ast_cond_signal(&t->poll_cond);
  329. ast_mutex_unlock(&t->taskprocessor_lock);
  330. pthread_join(t->poll_thread, NULL);
  331. t->poll_thread = AST_PTHREADT_NULL;
  332. ast_mutex_destroy(&t->taskprocessor_lock);
  333. ast_cond_destroy(&t->poll_cond);
  334. /* free it */
  335. if (t->stats) {
  336. ast_free(t->stats);
  337. t->stats = NULL;
  338. }
  339. ast_free((char *) t->name);
  340. }
  341. /* pop the front task and return it */
  342. static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
  343. {
  344. struct tps_task *task;
  345. if (!tps) {
  346. ast_log(LOG_ERROR, "missing taskprocessor\n");
  347. return NULL;
  348. }
  349. ast_mutex_lock(&tps->taskprocessor_lock);
  350. if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
  351. tps->tps_queue_size--;
  352. }
  353. ast_mutex_unlock(&tps->taskprocessor_lock);
  354. return task;
  355. }
  356. static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
  357. {
  358. return (tps) ? tps->tps_queue_size : -1;
  359. }
  360. /* taskprocessor name accessor */
  361. const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
  362. {
  363. if (!tps) {
  364. ast_log(LOG_ERROR, "no taskprocessor specified!\n");
  365. return NULL;
  366. }
  367. return tps->name;
  368. }
  369. /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
  370. * create the taskprocessor if we were told via ast_tps_options to return a reference only
  371. * if it already exists */
  372. struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
  373. {
  374. struct ast_taskprocessor *p, tmp_tps = {
  375. .name = name,
  376. };
  377. if (ast_strlen_zero(name)) {
  378. ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
  379. return NULL;
  380. }
  381. ao2_lock(tps_singletons);
  382. p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
  383. if (p) {
  384. ao2_unlock(tps_singletons);
  385. return p;
  386. }
  387. if (create & TPS_REF_IF_EXISTS) {
  388. /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
  389. ao2_unlock(tps_singletons);
  390. return NULL;
  391. }
  392. /* create a new taskprocessor */
  393. if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
  394. ao2_unlock(tps_singletons);
  395. ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
  396. return NULL;
  397. }
  398. ast_cond_init(&p->poll_cond, NULL);
  399. ast_mutex_init(&p->taskprocessor_lock);
  400. if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
  401. ao2_unlock(tps_singletons);
  402. ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
  403. ao2_ref(p, -1);
  404. return NULL;
  405. }
  406. if (!(p->name = ast_strdup(name))) {
  407. ao2_unlock(tps_singletons);
  408. ao2_ref(p, -1);
  409. return NULL;
  410. }
  411. p->poll_thread_run = 1;
  412. p->poll_thread = AST_PTHREADT_NULL;
  413. if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
  414. ao2_unlock(tps_singletons);
  415. ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
  416. ao2_ref(p, -1);
  417. return NULL;
  418. }
  419. if (!(ao2_link(tps_singletons, p))) {
  420. ao2_unlock(tps_singletons);
  421. ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
  422. ao2_ref(p, -1);
  423. return NULL;
  424. }
  425. ao2_unlock(tps_singletons);
  426. return p;
  427. }
  428. /* decrement the taskprocessor reference count and unlink from the container if necessary */
  429. void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
  430. {
  431. if (tps) {
  432. ao2_lock(tps_singletons);
  433. ao2_unlink(tps_singletons, tps);
  434. if (ao2_ref(tps, -1) > 1) {
  435. ao2_link(tps_singletons, tps);
  436. }
  437. ao2_unlock(tps_singletons);
  438. }
  439. return NULL;
  440. }
  441. /* push the task into the taskprocessor queue */
  442. int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
  443. {
  444. struct tps_task *t;
  445. if (!tps || !task_exe) {
  446. ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
  447. return -1;
  448. }
  449. if (!(t = tps_task_alloc(task_exe, datap))) {
  450. ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
  451. return -1;
  452. }
  453. ast_mutex_lock(&tps->taskprocessor_lock);
  454. AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
  455. tps->tps_queue_size++;
  456. ast_cond_signal(&tps->poll_cond);
  457. ast_mutex_unlock(&tps->taskprocessor_lock);
  458. return 0;
  459. }