kern_task.c 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. /* $OpenBSD: kern_task.c,v 1.14 2015/02/09 03:15:41 dlg Exp $ */
  2. /*
  3. * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
  4. *
  5. * Permission to use, copy, modify, and distribute this software for any
  6. * purpose with or without fee is hereby granted, provided that the above
  7. * copyright notice and this permission notice appear in all copies.
  8. *
  9. * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  10. * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  11. * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  12. * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  13. * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  14. * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  15. * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. */
  17. #include <sys/param.h>
  18. #include <sys/systm.h>
  19. #include <sys/malloc.h>
  20. #include <sys/mutex.h>
  21. #include <sys/kthread.h>
  22. #include <sys/task.h>
  23. #define TASK_ONQUEUE 1
  24. struct taskq {
  25. enum {
  26. TQ_S_CREATED,
  27. TQ_S_RUNNING,
  28. TQ_S_DESTROYED
  29. } tq_state;
  30. unsigned int tq_running;
  31. unsigned int tq_nthreads;
  32. unsigned int tq_flags;
  33. const char *tq_name;
  34. struct mutex tq_mtx;
  35. TAILQ_HEAD(, task) tq_worklist;
  36. };
  37. struct taskq taskq_sys = {
  38. TQ_S_CREATED,
  39. 0,
  40. 1,
  41. 0,
  42. "systq",
  43. MUTEX_INITIALIZER(IPL_HIGH),
  44. TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist)
  45. };
  46. struct taskq taskq_sys_mp = {
  47. TQ_S_CREATED,
  48. 0,
  49. 1,
  50. TASKQ_MPSAFE,
  51. "systqmp",
  52. MUTEX_INITIALIZER(IPL_HIGH),
  53. TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist)
  54. };
  55. typedef int (*sleepfn)(const volatile void *, struct mutex *, int,
  56. const char *, int);
  57. struct taskq *const systq = &taskq_sys;
  58. struct taskq *const systqmp = &taskq_sys_mp;
  59. void taskq_init(void); /* called in init_main.c */
  60. void taskq_create_thread(void *);
  61. int taskq_sleep(const volatile void *, struct mutex *, int,
  62. const char *, int);
  63. int taskq_next_work(struct taskq *, struct task *, sleepfn);
  64. void taskq_thread(void *);
  65. void
  66. taskq_init(void)
  67. {
  68. kthread_create_deferred(taskq_create_thread, systq);
  69. kthread_create_deferred(taskq_create_thread, systqmp);
  70. }
  71. struct taskq *
  72. taskq_create(const char *name, unsigned int nthreads, int ipl,
  73. unsigned int flags)
  74. {
  75. struct taskq *tq;
  76. tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
  77. if (tq == NULL)
  78. return (NULL);
  79. tq->tq_state = TQ_S_CREATED;
  80. tq->tq_running = 0;
  81. tq->tq_nthreads = nthreads;
  82. tq->tq_name = name;
  83. tq->tq_flags = flags;
  84. mtx_init(&tq->tq_mtx, ipl);
  85. TAILQ_INIT(&tq->tq_worklist);
  86. /* try to create a thread to guarantee that tasks will be serviced */
  87. kthread_create_deferred(taskq_create_thread, tq);
  88. return (tq);
  89. }
  90. void
  91. taskq_destroy(struct taskq *tq)
  92. {
  93. mtx_enter(&tq->tq_mtx);
  94. switch (tq->tq_state) {
  95. case TQ_S_CREATED:
  96. /* tq is still referenced by taskq_create_thread */
  97. tq->tq_state = TQ_S_DESTROYED;
  98. mtx_leave(&tq->tq_mtx);
  99. return;
  100. case TQ_S_RUNNING:
  101. tq->tq_state = TQ_S_DESTROYED;
  102. break;
  103. default:
  104. panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
  105. }
  106. while (tq->tq_running > 0) {
  107. wakeup(tq);
  108. msleep(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy", 0);
  109. }
  110. mtx_leave(&tq->tq_mtx);
  111. free(tq, M_DEVBUF, sizeof(*tq));
  112. }
  113. void
  114. taskq_create_thread(void *arg)
  115. {
  116. struct taskq *tq = arg;
  117. int rv;
  118. mtx_enter(&tq->tq_mtx);
  119. switch (tq->tq_state) {
  120. case TQ_S_DESTROYED:
  121. mtx_leave(&tq->tq_mtx);
  122. free(tq, M_DEVBUF, sizeof(*tq));
  123. return;
  124. case TQ_S_CREATED:
  125. tq->tq_state = TQ_S_RUNNING;
  126. break;
  127. default:
  128. panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
  129. }
  130. do {
  131. tq->tq_running++;
  132. mtx_leave(&tq->tq_mtx);
  133. rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
  134. mtx_enter(&tq->tq_mtx);
  135. if (rv != 0) {
  136. printf("unable to create thread for \"%s\" taskq\n",
  137. tq->tq_name);
  138. tq->tq_running--;
  139. /* could have been destroyed during kthread_create */
  140. if (tq->tq_state == TQ_S_DESTROYED &&
  141. tq->tq_running == 0)
  142. wakeup_one(&tq->tq_running);
  143. break;
  144. }
  145. } while (tq->tq_running < tq->tq_nthreads);
  146. mtx_leave(&tq->tq_mtx);
  147. }
  148. void
  149. task_set(struct task *t, void (*fn)(void *), void *arg)
  150. {
  151. t->t_func = fn;
  152. t->t_arg = arg;
  153. t->t_flags = 0;
  154. }
  155. int
  156. task_add(struct taskq *tq, struct task *w)
  157. {
  158. int rv = 0;
  159. if (ISSET(w->t_flags, TASK_ONQUEUE))
  160. return (0);
  161. mtx_enter(&tq->tq_mtx);
  162. if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
  163. rv = 1;
  164. SET(w->t_flags, TASK_ONQUEUE);
  165. TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
  166. }
  167. mtx_leave(&tq->tq_mtx);
  168. if (rv)
  169. wakeup_one(tq);
  170. return (rv);
  171. }
  172. int
  173. task_del(struct taskq *tq, struct task *w)
  174. {
  175. int rv = 0;
  176. if (!ISSET(w->t_flags, TASK_ONQUEUE))
  177. return (0);
  178. mtx_enter(&tq->tq_mtx);
  179. if (ISSET(w->t_flags, TASK_ONQUEUE)) {
  180. rv = 1;
  181. CLR(w->t_flags, TASK_ONQUEUE);
  182. TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
  183. }
  184. mtx_leave(&tq->tq_mtx);
  185. return (rv);
  186. }
  187. int
  188. taskq_sleep(const volatile void *ident, struct mutex *mtx, int priority,
  189. const char *wmesg, int tmo)
  190. {
  191. u_int *flags = &curproc->p_flag;
  192. int rv;
  193. atomic_clearbits_int(flags, P_CANTSLEEP);
  194. rv = msleep(ident, mtx, priority, wmesg, tmo);
  195. atomic_setbits_int(flags, P_CANTSLEEP);
  196. return (tmo);
  197. }
  198. int
  199. taskq_next_work(struct taskq *tq, struct task *work, sleepfn tqsleep)
  200. {
  201. struct task *next;
  202. mtx_enter(&tq->tq_mtx);
  203. while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
  204. if (tq->tq_state != TQ_S_RUNNING) {
  205. mtx_leave(&tq->tq_mtx);
  206. return (0);
  207. }
  208. tqsleep(tq, &tq->tq_mtx, PWAIT, "bored", 0);
  209. }
  210. TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
  211. CLR(next->t_flags, TASK_ONQUEUE);
  212. *work = *next; /* copy to caller to avoid races */
  213. next = TAILQ_FIRST(&tq->tq_worklist);
  214. mtx_leave(&tq->tq_mtx);
  215. if (next != NULL)
  216. wakeup_one(tq);
  217. return (1);
  218. }
  219. void
  220. taskq_thread(void *xtq)
  221. {
  222. sleepfn tqsleep = msleep;
  223. struct taskq *tq = xtq;
  224. struct task work;
  225. int last;
  226. if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
  227. KERNEL_UNLOCK();
  228. if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP)) {
  229. tqsleep = taskq_sleep;
  230. atomic_setbits_int(&curproc->p_flag, P_CANTSLEEP);
  231. }
  232. while (taskq_next_work(tq, &work, tqsleep)) {
  233. (*work.t_func)(work.t_arg);
  234. sched_pause();
  235. }
  236. mtx_enter(&tq->tq_mtx);
  237. last = (--tq->tq_running == 0);
  238. mtx_leave(&tq->tq_mtx);
  239. if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
  240. KERNEL_LOCK();
  241. if (ISSET(tq->tq_flags, TASKQ_CANTSLEEP))
  242. atomic_clearbits_int(&curproc->p_flag, P_CANTSLEEP);
  243. if (last)
  244. wakeup_one(&tq->tq_running);
  245. kthread_exit(0);
  246. }