sched.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 1999 - 2008, Digium, Inc.
  5. *
  6. * Mark Spencer <markster@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Scheduler Routines (from cheops-NG)
  21. *
  22. * \author Mark Spencer <markster@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #ifdef DEBUG_SCHEDULER
  30. #define DEBUG(a) do { \
  31. if (option_debug) \
  32. DEBUG_M(a) \
  33. } while (0)
  34. #else
  35. #define DEBUG(a)
  36. #endif
  37. #include <sys/time.h>
  38. #include "asterisk/sched.h"
  39. #include "asterisk/channel.h"
  40. #include "asterisk/lock.h"
  41. #include "asterisk/utils.h"
  42. #include "asterisk/linkedlists.h"
  43. #include "asterisk/dlinkedlists.h"
  44. #include "asterisk/hashtab.h"
  45. #include "asterisk/heap.h"
  46. #include "asterisk/threadstorage.h"
  47. AST_THREADSTORAGE(last_del_id);
  48. struct sched {
  49. AST_LIST_ENTRY(sched) list;
  50. int id; /*!< ID number of event */
  51. struct timeval when; /*!< Absolute time event should take place */
  52. int resched; /*!< When to reschedule */
  53. int variable; /*!< Use return value from callback to reschedule */
  54. const void *data; /*!< Data */
  55. ast_sched_cb callback; /*!< Callback */
  56. ssize_t __heap_index;
  57. };
  58. struct sched_context {
  59. ast_mutex_t lock;
  60. unsigned int eventcnt; /*!< Number of events processed */
  61. unsigned int schedcnt; /*!< Number of outstanding schedule events */
  62. unsigned int highwater; /*!< highest count so far */
  63. struct ast_hashtab *schedq_ht; /*!< hash table for fast searching */
  64. struct ast_heap *sched_heap;
  65. #ifdef SCHED_MAX_CACHE
  66. AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
  67. unsigned int schedccnt;
  68. #endif
  69. };
  70. struct ast_sched_thread {
  71. pthread_t thread;
  72. ast_mutex_t lock;
  73. ast_cond_t cond;
  74. struct sched_context *context;
  75. unsigned int stop:1;
  76. };
  77. static void *sched_run(void *data)
  78. {
  79. struct ast_sched_thread *st = data;
  80. while (!st->stop) {
  81. int ms;
  82. struct timespec ts = {
  83. .tv_sec = 0,
  84. };
  85. ast_mutex_lock(&st->lock);
  86. if (st->stop) {
  87. ast_mutex_unlock(&st->lock);
  88. return NULL;
  89. }
  90. ms = ast_sched_wait(st->context);
  91. if (ms == -1) {
  92. ast_cond_wait(&st->cond, &st->lock);
  93. } else {
  94. struct timeval tv;
  95. tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
  96. ts.tv_sec = tv.tv_sec;
  97. ts.tv_nsec = tv.tv_usec * 1000;
  98. ast_cond_timedwait(&st->cond, &st->lock, &ts);
  99. }
  100. ast_mutex_unlock(&st->lock);
  101. if (st->stop) {
  102. return NULL;
  103. }
  104. ast_sched_runq(st->context);
  105. }
  106. return NULL;
  107. }
  108. void ast_sched_thread_poke(struct ast_sched_thread *st)
  109. {
  110. ast_mutex_lock(&st->lock);
  111. ast_cond_signal(&st->cond);
  112. ast_mutex_unlock(&st->lock);
  113. }
  114. struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
  115. {
  116. return st->context;
  117. }
  118. struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
  119. {
  120. if (st->thread != AST_PTHREADT_NULL) {
  121. ast_mutex_lock(&st->lock);
  122. st->stop = 1;
  123. ast_cond_signal(&st->cond);
  124. ast_mutex_unlock(&st->lock);
  125. pthread_join(st->thread, NULL);
  126. st->thread = AST_PTHREADT_NULL;
  127. }
  128. ast_mutex_destroy(&st->lock);
  129. ast_cond_destroy(&st->cond);
  130. if (st->context) {
  131. sched_context_destroy(st->context);
  132. st->context = NULL;
  133. }
  134. ast_free(st);
  135. return NULL;
  136. }
  137. struct ast_sched_thread *ast_sched_thread_create(void)
  138. {
  139. struct ast_sched_thread *st;
  140. if (!(st = ast_calloc(1, sizeof(*st)))) {
  141. return NULL;
  142. }
  143. ast_mutex_init(&st->lock);
  144. ast_cond_init(&st->cond, NULL);
  145. st->thread = AST_PTHREADT_NULL;
  146. if (!(st->context = sched_context_create())) {
  147. ast_log(LOG_ERROR, "Failed to create scheduler\n");
  148. ast_sched_thread_destroy(st);
  149. return NULL;
  150. }
  151. if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
  152. ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
  153. ast_sched_thread_destroy(st);
  154. return NULL;
  155. }
  156. return st;
  157. }
  158. int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
  159. const void *data, int variable)
  160. {
  161. int res;
  162. ast_mutex_lock(&st->lock);
  163. res = ast_sched_add_variable(st->context, when, cb, data, variable);
  164. if (res != -1) {
  165. ast_cond_signal(&st->cond);
  166. }
  167. ast_mutex_unlock(&st->lock);
  168. return res;
  169. }
  170. int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
  171. const void *data)
  172. {
  173. int res;
  174. ast_mutex_lock(&st->lock);
  175. res = ast_sched_add(st->context, when, cb, data);
  176. if (res != -1) {
  177. ast_cond_signal(&st->cond);
  178. }
  179. ast_mutex_unlock(&st->lock);
  180. return res;
  181. }
  182. /* hash routines for sched */
  183. static int sched_cmp(const void *a, const void *b)
  184. {
  185. const struct sched *as = a;
  186. const struct sched *bs = b;
  187. return as->id != bs->id; /* return 0 on a match like strcmp would */
  188. }
  189. static unsigned int sched_hash(const void *obj)
  190. {
  191. const struct sched *s = obj;
  192. unsigned int h = s->id;
  193. return h;
  194. }
  195. static int sched_time_cmp(void *a, void *b)
  196. {
  197. return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
  198. }
  199. struct sched_context *sched_context_create(void)
  200. {
  201. struct sched_context *tmp;
  202. if (!(tmp = ast_calloc(1, sizeof(*tmp))))
  203. return NULL;
  204. ast_mutex_init(&tmp->lock);
  205. tmp->eventcnt = 1;
  206. tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
  207. if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
  208. offsetof(struct sched, __heap_index)))) {
  209. sched_context_destroy(tmp);
  210. return NULL;
  211. }
  212. return tmp;
  213. }
  214. void sched_context_destroy(struct sched_context *con)
  215. {
  216. struct sched *s;
  217. ast_mutex_lock(&con->lock);
  218. #ifdef SCHED_MAX_CACHE
  219. /* Eliminate the cache */
  220. while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
  221. ast_free(s);
  222. #endif
  223. if (con->sched_heap) {
  224. while ((s = ast_heap_pop(con->sched_heap))) {
  225. ast_free(s);
  226. }
  227. ast_heap_destroy(con->sched_heap);
  228. con->sched_heap = NULL;
  229. }
  230. ast_hashtab_destroy(con->schedq_ht, NULL);
  231. con->schedq_ht = NULL;
  232. /* And the context */
  233. ast_mutex_unlock(&con->lock);
  234. ast_mutex_destroy(&con->lock);
  235. ast_free(con);
  236. }
  237. static struct sched *sched_alloc(struct sched_context *con)
  238. {
  239. struct sched *tmp;
  240. /*
  241. * We keep a small cache of schedule entries
  242. * to minimize the number of necessary malloc()'s
  243. */
  244. #ifdef SCHED_MAX_CACHE
  245. if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
  246. con->schedccnt--;
  247. else
  248. #endif
  249. tmp = ast_calloc(1, sizeof(*tmp));
  250. return tmp;
  251. }
  252. static void sched_release(struct sched_context *con, struct sched *tmp)
  253. {
  254. /*
  255. * Add to the cache, or just free() if we
  256. * already have too many cache entries
  257. */
  258. #ifdef SCHED_MAX_CACHE
  259. if (con->schedccnt < SCHED_MAX_CACHE) {
  260. AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
  261. con->schedccnt++;
  262. } else
  263. #endif
  264. ast_free(tmp);
  265. }
  266. /*! \brief
  267. * Return the number of milliseconds
  268. * until the next scheduled event
  269. */
  270. int ast_sched_wait(struct sched_context *con)
  271. {
  272. int ms;
  273. struct sched *s;
  274. DEBUG(ast_debug(1, "ast_sched_wait()\n"));
  275. ast_mutex_lock(&con->lock);
  276. if ((s = ast_heap_peek(con->sched_heap, 1))) {
  277. ms = ast_tvdiff_ms(s->when, ast_tvnow());
  278. if (ms < 0) {
  279. ms = 0;
  280. }
  281. } else {
  282. ms = -1;
  283. }
  284. ast_mutex_unlock(&con->lock);
  285. return ms;
  286. }
  287. /*! \brief
  288. * Take a sched structure and put it in the
  289. * queue, such that the soonest event is
  290. * first in the list.
  291. */
  292. static void schedule(struct sched_context *con, struct sched *s)
  293. {
  294. ast_heap_push(con->sched_heap, s);
  295. if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
  296. ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
  297. }
  298. con->schedcnt++;
  299. if (con->schedcnt > con->highwater) {
  300. con->highwater = con->schedcnt;
  301. }
  302. }
  303. /*! \brief
  304. * given the last event *tv and the offset in milliseconds 'when',
  305. * computes the next value,
  306. */
  307. static int sched_settime(struct timeval *t, int when)
  308. {
  309. struct timeval now = ast_tvnow();
  310. /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
  311. if (ast_tvzero(*t)) /* not supplied, default to now */
  312. *t = now;
  313. *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
  314. if (ast_tvcmp(*t, now) < 0) {
  315. *t = now;
  316. }
  317. return 0;
  318. }
  319. int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
  320. {
  321. /* 0 means the schedule item is new; do not delete */
  322. if (old_id > 0) {
  323. AST_SCHED_DEL(con, old_id);
  324. }
  325. return ast_sched_add_variable(con, when, callback, data, variable);
  326. }
  327. /*! \brief
  328. * Schedule callback(data) to happen when ms into the future
  329. */
  330. int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
  331. {
  332. struct sched *tmp;
  333. int res = -1;
  334. DEBUG(ast_debug(1, "ast_sched_add()\n"));
  335. ast_mutex_lock(&con->lock);
  336. if ((tmp = sched_alloc(con))) {
  337. tmp->id = con->eventcnt++;
  338. tmp->callback = callback;
  339. tmp->data = data;
  340. tmp->resched = when;
  341. tmp->variable = variable;
  342. tmp->when = ast_tv(0, 0);
  343. if (sched_settime(&tmp->when, when)) {
  344. sched_release(con, tmp);
  345. } else {
  346. schedule(con, tmp);
  347. res = tmp->id;
  348. }
  349. }
  350. #ifdef DUMP_SCHEDULER
  351. /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
  352. if (option_debug)
  353. ast_sched_dump(con);
  354. #endif
  355. ast_mutex_unlock(&con->lock);
  356. return res;
  357. }
  358. int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
  359. {
  360. if (old_id > -1) {
  361. AST_SCHED_DEL(con, old_id);
  362. }
  363. return ast_sched_add(con, when, callback, data);
  364. }
  365. int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
  366. {
  367. return ast_sched_add_variable(con, when, callback, data, 0);
  368. }
  369. const void *ast_sched_find_data(struct sched_context *con, int id)
  370. {
  371. struct sched tmp,*res;
  372. tmp.id = id;
  373. res = ast_hashtab_lookup(con->schedq_ht, &tmp);
  374. if (res)
  375. return res->data;
  376. return NULL;
  377. }
  378. /*! \brief
  379. * Delete the schedule entry with number
  380. * "id". It's nearly impossible that there
  381. * would be two or more in the list with that
  382. * id.
  383. */
  384. #ifndef AST_DEVMODE
  385. int ast_sched_del(struct sched_context *con, int id)
  386. #else
  387. int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
  388. #endif
  389. {
  390. struct sched *s, tmp = {
  391. .id = id,
  392. };
  393. int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
  394. DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
  395. if (id < 0) {
  396. return 0;
  397. }
  398. ast_mutex_lock(&con->lock);
  399. s = ast_hashtab_lookup(con->schedq_ht, &tmp);
  400. if (s) {
  401. if (!ast_heap_remove(con->sched_heap, s)) {
  402. ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
  403. }
  404. if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
  405. ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
  406. }
  407. con->schedcnt--;
  408. sched_release(con, s);
  409. }
  410. #ifdef DUMP_SCHEDULER
  411. /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
  412. if (option_debug)
  413. ast_sched_dump(con);
  414. #endif
  415. ast_mutex_unlock(&con->lock);
  416. if (!s && *last_id != id) {
  417. ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
  418. #ifndef AST_DEVMODE
  419. ast_assert(s != NULL);
  420. #else
  421. {
  422. char buf[100];
  423. snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
  424. _ast_assert(0, buf, file, line, function);
  425. }
  426. #endif
  427. *last_id = id;
  428. return -1;
  429. } else if (!s) {
  430. return -1;
  431. }
  432. return 0;
  433. }
  434. void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
  435. {
  436. int i, x;
  437. struct sched *cur;
  438. int countlist[cbnames->numassocs + 1];
  439. size_t heap_size;
  440. memset(countlist, 0, sizeof(countlist));
  441. ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %u\n", con->highwater, con->schedcnt);
  442. ast_mutex_lock(&con->lock);
  443. heap_size = ast_heap_size(con->sched_heap);
  444. for (x = 1; x <= heap_size; x++) {
  445. cur = ast_heap_peek(con->sched_heap, x);
  446. /* match the callback to the cblist */
  447. for (i = 0; i < cbnames->numassocs; i++) {
  448. if (cur->callback == cbnames->cblist[i]) {
  449. break;
  450. }
  451. }
  452. if (i < cbnames->numassocs) {
  453. countlist[i]++;
  454. } else {
  455. countlist[cbnames->numassocs]++;
  456. }
  457. }
  458. ast_mutex_unlock(&con->lock);
  459. for (i = 0; i < cbnames->numassocs; i++) {
  460. ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
  461. }
  462. ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
  463. }
  464. /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
  465. void ast_sched_dump(struct sched_context *con)
  466. {
  467. struct sched *q;
  468. struct timeval when = ast_tvnow();
  469. int x;
  470. size_t heap_size;
  471. #ifdef SCHED_MAX_CACHE
  472. ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u Cache, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
  473. #else
  474. ast_debug(1, "Asterisk Schedule Dump (%u in Q, %u Total, %u high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
  475. #endif
  476. ast_debug(1, "=============================================================\n");
  477. ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
  478. ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
  479. ast_mutex_lock(&con->lock);
  480. heap_size = ast_heap_size(con->sched_heap);
  481. for (x = 1; x <= heap_size; x++) {
  482. struct timeval delta;
  483. q = ast_heap_peek(con->sched_heap, x);
  484. delta = ast_tvsub(q->when, when);
  485. ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
  486. q->id,
  487. q->callback,
  488. q->data,
  489. (long)delta.tv_sec,
  490. (long int)delta.tv_usec);
  491. }
  492. ast_mutex_unlock(&con->lock);
  493. ast_debug(1, "=============================================================\n");
  494. }
  495. /*! \brief
  496. * Launch all events which need to be run at this time.
  497. */
  498. int ast_sched_runq(struct sched_context *con)
  499. {
  500. struct sched *current;
  501. struct timeval when;
  502. int numevents;
  503. int res;
  504. DEBUG(ast_debug(1, "ast_sched_runq()\n"));
  505. ast_mutex_lock(&con->lock);
  506. when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
  507. for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
  508. /* schedule all events which are going to expire within 1ms.
  509. * We only care about millisecond accuracy anyway, so this will
  510. * help us get more than one event at one time if they are very
  511. * close together.
  512. */
  513. if (ast_tvcmp(current->when, when) != -1) {
  514. break;
  515. }
  516. current = ast_heap_pop(con->sched_heap);
  517. if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
  518. ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
  519. }
  520. con->schedcnt--;
  521. /*
  522. * At this point, the schedule queue is still intact. We
  523. * have removed the first event and the rest is still there,
  524. * so it's permissible for the callback to add new events, but
  525. * trying to delete itself won't work because it isn't in
  526. * the schedule queue. If that's what it wants to do, it
  527. * should return 0.
  528. */
  529. ast_mutex_unlock(&con->lock);
  530. res = current->callback(current->data);
  531. ast_mutex_lock(&con->lock);
  532. if (res) {
  533. /*
  534. * If they return non-zero, we should schedule them to be
  535. * run again.
  536. */
  537. if (sched_settime(&current->when, current->variable? res : current->resched)) {
  538. sched_release(con, current);
  539. } else {
  540. schedule(con, current);
  541. }
  542. } else {
  543. /* No longer needed, so release it */
  544. sched_release(con, current);
  545. }
  546. }
  547. ast_mutex_unlock(&con->lock);
  548. return numevents;
  549. }
  550. long ast_sched_when(struct sched_context *con,int id)
  551. {
  552. struct sched *s, tmp;
  553. long secs = -1;
  554. DEBUG(ast_debug(1, "ast_sched_when()\n"));
  555. ast_mutex_lock(&con->lock);
  556. /* these next 2 lines replace a lookup loop */
  557. tmp.id = id;
  558. s = ast_hashtab_lookup(con->schedq_ht, &tmp);
  559. if (s) {
  560. struct timeval now = ast_tvnow();
  561. secs = s->when.tv_sec - now.tv_sec;
  562. }
  563. ast_mutex_unlock(&con->lock);
  564. return secs;
  565. }