sched.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 1999 - 2010, Digium, Inc.
  5. *
  6. * Mark Spencer <markster@digium.com>
  7. * Russell Bryant <russell@digium.com>
  8. *
  9. * See http://www.asterisk.org for more information about
  10. * the Asterisk project. Please do not directly contact
  11. * any of the maintainers of this project for assistance;
  12. * the project provides a web site, mailing lists and IRC
  13. * channels for your use.
  14. *
  15. * This program is free software, distributed under the terms of
  16. * the GNU General Public License Version 2. See the LICENSE file
  17. * at the top of the source tree.
  18. */
  19. /*! \file
  20. *
  21. * \brief Scheduler Routines (from cheops-NG)
  22. *
  23. * \author Mark Spencer <markster@digium.com>
  24. */
  25. /*** MODULEINFO
  26. <support_level>core</support_level>
  27. ***/
  28. #include "asterisk.h"
  29. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  30. #ifdef DEBUG_SCHEDULER
  31. #define DEBUG(a) do { \
  32. if (option_debug) \
  33. DEBUG_M(a) \
  34. } while (0)
  35. #else
  36. #define DEBUG(a)
  37. #endif
  38. #include <sys/time.h>
  39. #include "asterisk/sched.h"
  40. #include "asterisk/channel.h"
  41. #include "asterisk/lock.h"
  42. #include "asterisk/utils.h"
  43. #include "asterisk/heap.h"
  44. #include "asterisk/threadstorage.h"
  45. /*!
  46. * \brief Max num of schedule structs
  47. *
  48. * \note The max number of schedule structs to keep around
  49. * for use. Undefine to disable schedule structure
  50. * caching. (Only disable this on very low memory
  51. * machines)
  52. */
  53. #define SCHED_MAX_CACHE 128
  54. AST_THREADSTORAGE(last_del_id);
  55. struct sched {
  56. AST_LIST_ENTRY(sched) list;
  57. int id; /*!< ID number of event */
  58. struct timeval when; /*!< Absolute time event should take place */
  59. int resched; /*!< When to reschedule */
  60. int variable; /*!< Use return value from callback to reschedule */
  61. const void *data; /*!< Data */
  62. ast_sched_cb callback; /*!< Callback */
  63. ssize_t __heap_index;
  64. /*!
  65. * Used to synchronize between thread running a task and thread
  66. * attempting to delete a task
  67. */
  68. ast_cond_t cond;
  69. /*! Indication that a running task was deleted. */
  70. unsigned int deleted:1;
  71. };
  72. struct sched_thread {
  73. pthread_t thread;
  74. ast_cond_t cond;
  75. unsigned int stop:1;
  76. };
  77. struct ast_sched_context {
  78. ast_mutex_t lock;
  79. unsigned int eventcnt; /*!< Number of events processed */
  80. unsigned int highwater; /*!< highest count so far */
  81. struct ast_heap *sched_heap;
  82. struct sched_thread *sched_thread;
  83. /*! The scheduled task that is currently executing */
  84. struct sched *currently_executing;
  85. #ifdef SCHED_MAX_CACHE
  86. AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
  87. unsigned int schedccnt;
  88. #endif
  89. };
  90. static void *sched_run(void *data)
  91. {
  92. struct ast_sched_context *con = data;
  93. while (!con->sched_thread->stop) {
  94. int ms;
  95. struct timespec ts = {
  96. .tv_sec = 0,
  97. };
  98. ast_mutex_lock(&con->lock);
  99. if (con->sched_thread->stop) {
  100. ast_mutex_unlock(&con->lock);
  101. return NULL;
  102. }
  103. ms = ast_sched_wait(con);
  104. if (ms == -1) {
  105. ast_cond_wait(&con->sched_thread->cond, &con->lock);
  106. } else {
  107. struct timeval tv;
  108. tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
  109. ts.tv_sec = tv.tv_sec;
  110. ts.tv_nsec = tv.tv_usec * 1000;
  111. ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
  112. }
  113. ast_mutex_unlock(&con->lock);
  114. if (con->sched_thread->stop) {
  115. return NULL;
  116. }
  117. ast_sched_runq(con);
  118. }
  119. return NULL;
  120. }
  121. static void sched_thread_destroy(struct ast_sched_context *con)
  122. {
  123. if (!con->sched_thread) {
  124. return;
  125. }
  126. if (con->sched_thread->thread != AST_PTHREADT_NULL) {
  127. ast_mutex_lock(&con->lock);
  128. con->sched_thread->stop = 1;
  129. ast_cond_signal(&con->sched_thread->cond);
  130. ast_mutex_unlock(&con->lock);
  131. pthread_join(con->sched_thread->thread, NULL);
  132. con->sched_thread->thread = AST_PTHREADT_NULL;
  133. }
  134. ast_cond_destroy(&con->sched_thread->cond);
  135. ast_free(con->sched_thread);
  136. con->sched_thread = NULL;
  137. }
  138. int ast_sched_start_thread(struct ast_sched_context *con)
  139. {
  140. struct sched_thread *st;
  141. if (con->sched_thread) {
  142. ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
  143. return -1;
  144. }
  145. if (!(st = ast_calloc(1, sizeof(*st)))) {
  146. return -1;
  147. }
  148. ast_cond_init(&st->cond, NULL);
  149. st->thread = AST_PTHREADT_NULL;
  150. con->sched_thread = st;
  151. if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
  152. ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
  153. sched_thread_destroy(con);
  154. return -1;
  155. }
  156. return 0;
  157. }
  158. static int sched_time_cmp(void *a, void *b)
  159. {
  160. return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
  161. }
  162. struct ast_sched_context *ast_sched_context_create(void)
  163. {
  164. struct ast_sched_context *tmp;
  165. if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
  166. return NULL;
  167. }
  168. ast_mutex_init(&tmp->lock);
  169. tmp->eventcnt = 1;
  170. if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
  171. offsetof(struct sched, __heap_index)))) {
  172. ast_sched_context_destroy(tmp);
  173. return NULL;
  174. }
  175. return tmp;
  176. }
  177. static void sched_free(struct sched *task)
  178. {
  179. ast_cond_destroy(&task->cond);
  180. ast_free(task);
  181. }
  182. void ast_sched_context_destroy(struct ast_sched_context *con)
  183. {
  184. struct sched *s;
  185. sched_thread_destroy(con);
  186. con->sched_thread = NULL;
  187. ast_mutex_lock(&con->lock);
  188. #ifdef SCHED_MAX_CACHE
  189. while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
  190. sched_free(s);
  191. }
  192. #endif
  193. if (con->sched_heap) {
  194. while ((s = ast_heap_pop(con->sched_heap))) {
  195. sched_free(s);
  196. }
  197. ast_heap_destroy(con->sched_heap);
  198. con->sched_heap = NULL;
  199. }
  200. ast_mutex_unlock(&con->lock);
  201. ast_mutex_destroy(&con->lock);
  202. ast_free(con);
  203. }
  204. static struct sched *sched_alloc(struct ast_sched_context *con)
  205. {
  206. struct sched *tmp;
  207. /*
  208. * We keep a small cache of schedule entries
  209. * to minimize the number of necessary malloc()'s
  210. */
  211. #ifdef SCHED_MAX_CACHE
  212. if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
  213. con->schedccnt--;
  214. } else
  215. #endif
  216. {
  217. tmp = ast_calloc(1, sizeof(*tmp));
  218. ast_cond_init(&tmp->cond, NULL);
  219. }
  220. return tmp;
  221. }
  222. static void sched_release(struct ast_sched_context *con, struct sched *tmp)
  223. {
  224. /*
  225. * Add to the cache, or just free() if we
  226. * already have too many cache entries
  227. */
  228. #ifdef SCHED_MAX_CACHE
  229. if (con->schedccnt < SCHED_MAX_CACHE) {
  230. AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
  231. con->schedccnt++;
  232. } else
  233. #endif
  234. sched_free(tmp);
  235. }
  236. void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
  237. {
  238. int i = 1;
  239. struct sched *current;
  240. ast_mutex_lock(&con->lock);
  241. while ((current = ast_heap_peek(con->sched_heap, i))) {
  242. if (current->callback != match) {
  243. i++;
  244. continue;
  245. }
  246. ast_heap_remove(con->sched_heap, current);
  247. cleanup_cb(current->data);
  248. sched_release(con, current);
  249. }
  250. ast_mutex_unlock(&con->lock);
  251. }
  252. /*! \brief
  253. * Return the number of milliseconds
  254. * until the next scheduled event
  255. */
  256. int ast_sched_wait(struct ast_sched_context *con)
  257. {
  258. int ms;
  259. struct sched *s;
  260. DEBUG(ast_debug(1, "ast_sched_wait()\n"));
  261. ast_mutex_lock(&con->lock);
  262. if ((s = ast_heap_peek(con->sched_heap, 1))) {
  263. ms = ast_tvdiff_ms(s->when, ast_tvnow());
  264. if (ms < 0) {
  265. ms = 0;
  266. }
  267. } else {
  268. ms = -1;
  269. }
  270. ast_mutex_unlock(&con->lock);
  271. return ms;
  272. }
  273. /*! \brief
  274. * Take a sched structure and put it in the
  275. * queue, such that the soonest event is
  276. * first in the list.
  277. */
  278. static void schedule(struct ast_sched_context *con, struct sched *s)
  279. {
  280. ast_heap_push(con->sched_heap, s);
  281. if (ast_heap_size(con->sched_heap) > con->highwater) {
  282. con->highwater = ast_heap_size(con->sched_heap);
  283. }
  284. }
  285. /*! \brief
  286. * given the last event *tv and the offset in milliseconds 'when',
  287. * computes the next value,
  288. */
  289. static int sched_settime(struct timeval *t, int when)
  290. {
  291. struct timeval now = ast_tvnow();
  292. /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
  293. if (ast_tvzero(*t)) /* not supplied, default to now */
  294. *t = now;
  295. *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
  296. if (ast_tvcmp(*t, now) < 0) {
  297. *t = now;
  298. }
  299. return 0;
  300. }
  301. int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
  302. {
  303. /* 0 means the schedule item is new; do not delete */
  304. if (old_id > 0) {
  305. AST_SCHED_DEL(con, old_id);
  306. }
  307. return ast_sched_add_variable(con, when, callback, data, variable);
  308. }
  309. /*! \brief
  310. * Schedule callback(data) to happen when ms into the future
  311. */
  312. int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
  313. {
  314. struct sched *tmp;
  315. int res = -1;
  316. DEBUG(ast_debug(1, "ast_sched_add()\n"));
  317. ast_mutex_lock(&con->lock);
  318. if ((tmp = sched_alloc(con))) {
  319. tmp->id = con->eventcnt++;
  320. tmp->callback = callback;
  321. tmp->data = data;
  322. tmp->resched = when;
  323. tmp->variable = variable;
  324. tmp->when = ast_tv(0, 0);
  325. tmp->deleted = 0;
  326. if (sched_settime(&tmp->when, when)) {
  327. sched_release(con, tmp);
  328. } else {
  329. schedule(con, tmp);
  330. res = tmp->id;
  331. }
  332. }
  333. #ifdef DUMP_SCHEDULER
  334. /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
  335. if (option_debug)
  336. ast_sched_dump(con);
  337. #endif
  338. if (con->sched_thread) {
  339. ast_cond_signal(&con->sched_thread->cond);
  340. }
  341. ast_mutex_unlock(&con->lock);
  342. return res;
  343. }
  344. int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
  345. {
  346. if (old_id > -1) {
  347. AST_SCHED_DEL(con, old_id);
  348. }
  349. return ast_sched_add(con, when, callback, data);
  350. }
  351. int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
  352. {
  353. return ast_sched_add_variable(con, when, callback, data, 0);
  354. }
  355. static struct sched *sched_find(struct ast_sched_context *con, int id)
  356. {
  357. int x;
  358. size_t heap_size;
  359. heap_size = ast_heap_size(con->sched_heap);
  360. for (x = 1; x <= heap_size; x++) {
  361. struct sched *cur = ast_heap_peek(con->sched_heap, x);
  362. if (cur->id == id) {
  363. return cur;
  364. }
  365. }
  366. return NULL;
  367. }
  368. const void *ast_sched_find_data(struct ast_sched_context *con, int id)
  369. {
  370. struct sched *s;
  371. const void *data = NULL;
  372. ast_mutex_lock(&con->lock);
  373. s = sched_find(con, id);
  374. if (s) {
  375. data = s->data;
  376. }
  377. ast_mutex_unlock(&con->lock);
  378. return data;
  379. }
  380. /*! \brief
  381. * Delete the schedule entry with number
  382. * "id". It's nearly impossible that there
  383. * would be two or more in the list with that
  384. * id.
  385. */
  386. #ifndef AST_DEVMODE
  387. int ast_sched_del(struct ast_sched_context *con, int id)
  388. #else
  389. int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
  390. #endif
  391. {
  392. struct sched *s = NULL;
  393. int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
  394. DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
  395. if (id < 0) {
  396. return 0;
  397. }
  398. ast_mutex_lock(&con->lock);
  399. s = sched_find(con, id);
  400. if (s) {
  401. if (!ast_heap_remove(con->sched_heap, s)) {
  402. ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
  403. }
  404. sched_release(con, s);
  405. } else if (con->currently_executing && (id == con->currently_executing->id)) {
  406. s = con->currently_executing;
  407. s->deleted = 1;
  408. /* Wait for executing task to complete so that caller of ast_sched_del() does not
  409. * free memory out from under the task.
  410. */
  411. ast_cond_wait(&s->cond, &con->lock);
  412. /* Do not sched_release() here because ast_sched_runq() will do it */
  413. }
  414. #ifdef DUMP_SCHEDULER
  415. /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
  416. if (option_debug)
  417. ast_sched_dump(con);
  418. #endif
  419. if (con->sched_thread) {
  420. ast_cond_signal(&con->sched_thread->cond);
  421. }
  422. ast_mutex_unlock(&con->lock);
  423. if (!s && *last_id != id) {
  424. ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
  425. #ifndef AST_DEVMODE
  426. ast_assert(s != NULL);
  427. #else
  428. {
  429. char buf[100];
  430. snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
  431. _ast_assert(0, buf, file, line, function);
  432. }
  433. #endif
  434. *last_id = id;
  435. return -1;
  436. } else if (!s) {
  437. return -1;
  438. }
  439. return 0;
  440. }
  441. void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
  442. {
  443. int i, x;
  444. struct sched *cur;
  445. int countlist[cbnames->numassocs + 1];
  446. size_t heap_size;
  447. memset(countlist, 0, sizeof(countlist));
  448. ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
  449. ast_mutex_lock(&con->lock);
  450. heap_size = ast_heap_size(con->sched_heap);
  451. for (x = 1; x <= heap_size; x++) {
  452. cur = ast_heap_peek(con->sched_heap, x);
  453. /* match the callback to the cblist */
  454. for (i = 0; i < cbnames->numassocs; i++) {
  455. if (cur->callback == cbnames->cblist[i]) {
  456. break;
  457. }
  458. }
  459. if (i < cbnames->numassocs) {
  460. countlist[i]++;
  461. } else {
  462. countlist[cbnames->numassocs]++;
  463. }
  464. }
  465. ast_mutex_unlock(&con->lock);
  466. for (i = 0; i < cbnames->numassocs; i++) {
  467. ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
  468. }
  469. ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
  470. }
  471. /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
  472. void ast_sched_dump(struct ast_sched_context *con)
  473. {
  474. struct sched *q;
  475. struct timeval when = ast_tvnow();
  476. int x;
  477. size_t heap_size;
  478. #ifdef SCHED_MAX_CACHE
  479. ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
  480. #else
  481. ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
  482. #endif
  483. ast_debug(1, "=============================================================\n");
  484. ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
  485. ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
  486. ast_mutex_lock(&con->lock);
  487. heap_size = ast_heap_size(con->sched_heap);
  488. for (x = 1; x <= heap_size; x++) {
  489. struct timeval delta;
  490. q = ast_heap_peek(con->sched_heap, x);
  491. delta = ast_tvsub(q->when, when);
  492. ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
  493. q->id,
  494. q->callback,
  495. q->data,
  496. (long)delta.tv_sec,
  497. (long int)delta.tv_usec);
  498. }
  499. ast_mutex_unlock(&con->lock);
  500. ast_debug(1, "=============================================================\n");
  501. }
  502. /*! \brief
  503. * Launch all events which need to be run at this time.
  504. */
  505. int ast_sched_runq(struct ast_sched_context *con)
  506. {
  507. struct sched *current;
  508. struct timeval when;
  509. int numevents;
  510. int res;
  511. DEBUG(ast_debug(1, "ast_sched_runq()\n"));
  512. ast_mutex_lock(&con->lock);
  513. when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
  514. for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
  515. /* schedule all events which are going to expire within 1ms.
  516. * We only care about millisecond accuracy anyway, so this will
  517. * help us get more than one event at one time if they are very
  518. * close together.
  519. */
  520. if (ast_tvcmp(current->when, when) != -1) {
  521. break;
  522. }
  523. current = ast_heap_pop(con->sched_heap);
  524. /*
  525. * At this point, the schedule queue is still intact. We
  526. * have removed the first event and the rest is still there,
  527. * so it's permissible for the callback to add new events, but
  528. * trying to delete itself won't work because it isn't in
  529. * the schedule queue. If that's what it wants to do, it
  530. * should return 0.
  531. */
  532. con->currently_executing = current;
  533. ast_mutex_unlock(&con->lock);
  534. res = current->callback(current->data);
  535. ast_mutex_lock(&con->lock);
  536. con->currently_executing = NULL;
  537. ast_cond_signal(&current->cond);
  538. if (res && !current->deleted) {
  539. /*
  540. * If they return non-zero, we should schedule them to be
  541. * run again.
  542. */
  543. if (sched_settime(&current->when, current->variable? res : current->resched)) {
  544. sched_release(con, current);
  545. } else {
  546. schedule(con, current);
  547. }
  548. } else {
  549. /* No longer needed, so release it */
  550. sched_release(con, current);
  551. }
  552. }
  553. ast_mutex_unlock(&con->lock);
  554. return numevents;
  555. }
  556. long ast_sched_when(struct ast_sched_context *con,int id)
  557. {
  558. struct sched *s;
  559. long secs = -1;
  560. DEBUG(ast_debug(1, "ast_sched_when()\n"));
  561. ast_mutex_lock(&con->lock);
  562. s = sched_find(con, id);
  563. if (s) {
  564. struct timeval now = ast_tvnow();
  565. secs = s->when.tv_sec - now.tv_sec;
  566. }
  567. ast_mutex_unlock(&con->lock);
  568. return secs;
  569. }