work.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. /*
  2. * Copyright (c) 2013-2014 Richard Braun.
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include <assert.h>
  18. #include <errno.h>
  19. #include <stdalign.h>
  20. #include <stddef.h>
  21. #include <stdio.h>
  22. #include <kern/bitmap.h>
  23. #include <kern/init.h>
  24. #include <kern/kmem.h>
  25. #include <kern/list.h>
  26. #include <kern/log.h>
  27. #include <kern/macros.h>
  28. #include <kern/panic.h>
  29. #include <kern/percpu.h>
  30. #include <kern/spinlock.h>
  31. #include <kern/syscnt.h>
  32. #include <kern/thread.h>
  33. #include <kern/work.h>
  34. #include <machine/cpu.h>
  35. #define WORK_PRIO_NORMAL THREAD_SCHED_FS_PRIO_DEFAULT
  36. #define WORK_PRIO_HIGH THREAD_SCHED_FS_PRIO_MAX
  37. #define WORK_INVALID_CPU ((unsigned int)-1)
  38. // Keep at least that many threads alive when a work pool is idle.
  39. #define WORK_THREADS_SPARE 4
  40. /*
  41. * When computing the maximum number of worker threads, start with multiplying
  42. * the number of processors by the ratio below. If the result is greater than
  43. * the threshold, retry by decreasing the ratio until either the result is
  44. * less than the threshold or the ratio is 1.
  45. */
  46. #define WORK_THREADS_RATIO 4
  47. #define WORK_THREADS_THRESHOLD 512
  48. #define WORK_MAX_THREADS MAX (CONFIG_MAX_CPUS, WORK_THREADS_THRESHOLD)
  49. // Work pool flags.
  50. #define WORK_PF_GLOBAL 0x1 // System-wide work queue.
  51. #define WORK_PF_HIGHPRIO 0x2 // High priority worker threads.
  52. struct work_thread
  53. {
  54. struct list node;
  55. struct thread *thread;
  56. struct work_pool *pool;
  57. uint32_t id;
  58. };
  59. /*
  60. * Pool of threads and works.
  61. *
  62. * Interrupts must be disabled when accessing a work pool. Holding the
  63. * lock is required for global pools only, whereas exclusive access on
  64. * per-processor pools is achieved by disabling preemption.
  65. *
  66. * There are two internal queues of pending works. When first scheduling
  67. * a work, it is inserted into queue0. After a periodic event, works still
  68. * present in queue0 are moved to queue1. If these works are still present
  69. * in queue1 at the next periodic event, it means they couldn't be processed
  70. * for a complete period between two periodic events, at which point it is
  71. * assumed that processing works on the same processor they were queued on
  72. * becomes less relevant. As a result, periodic events also trigger the
  73. * transfer of works from queue1 to the matching global pool. Global pools
  74. * only use one queue.
  75. */
  76. struct work_pool
  77. {
  78. __cacheline_aligned struct spinlock lock;
  79. int flags;
  80. struct work_queue queue0;
  81. struct work_queue queue1;
  82. struct work_thread *manager;
  83. struct syscnt sc_transfers;
  84. uint32_t cpu;
  85. uint32_t max_threads;
  86. uint32_t nr_threads;
  87. uint32_t nr_available_threads;
  88. struct list available_threads;
  89. struct list dead_threads;
  90. BITMAP_DECLARE (bitmap, WORK_MAX_THREADS);
  91. };
  92. static int work_thread_create (struct work_pool *pool, uint32_t id);
  93. static struct work_pool work_pool_cpu_main __percpu;
  94. static struct work_pool work_pool_cpu_highprio __percpu;
  95. static struct work_pool work_pool_main;
  96. static struct work_pool work_pool_highprio;
  97. static struct kmem_cache work_thread_cache;
  98. static unsigned int
  99. work_pool_alloc_id (struct work_pool *pool)
  100. {
  101. assert (pool->nr_threads < pool->max_threads);
  102. ++pool->nr_threads;
  103. int bit = bitmap_find_first_zero (pool->bitmap, pool->max_threads);
  104. assert (bit >= 0);
  105. bitmap_set (pool->bitmap, bit);
  106. return (bit);
  107. }
  108. static void
  109. work_pool_free_id (struct work_pool *pool, unsigned int id)
  110. {
  111. assert (pool->nr_threads != 0);
  112. --pool->nr_threads;
  113. bitmap_clear (pool->bitmap, id);
  114. }
  115. static unsigned int
  116. work_pool_cpu_id (const struct work_pool *pool)
  117. {
  118. assert (!(pool->flags & WORK_PF_GLOBAL));
  119. return (pool->cpu);
  120. }
  121. static unsigned int
  122. work_pool_compute_max_threads (uint32_t nr_cpus)
  123. {
  124. uint32_t ratio = WORK_THREADS_RATIO, max_threads = nr_cpus * ratio;
  125. for (; ratio > 1 && max_threads > WORK_THREADS_THRESHOLD;
  126. max_threads = nr_cpus * (--ratio))
  127. ;
  128. assert (max_threads);
  129. assert (max_threads <= WORK_MAX_THREADS);
  130. return (max_threads);
  131. }
  132. static void __init
  133. work_pool_init (struct work_pool *pool)
  134. {
  135. spinlock_init (&pool->lock);
  136. work_queue_init (&pool->queue0);
  137. work_queue_init (&pool->queue1);
  138. pool->manager = NULL;
  139. }
  140. static void __init
  141. work_pool_build (struct work_pool *pool, uint32_t cpu, int flags)
  142. {
  143. pool->flags = flags;
  144. uint32_t nr_cpus = 1;
  145. if (flags & WORK_PF_GLOBAL)
  146. {
  147. nr_cpus = cpu_count ();
  148. pool->cpu = WORK_INVALID_CPU;
  149. }
  150. else
  151. {
  152. const char *suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : "";
  153. char name[SYSCNT_NAME_SIZE];
  154. snprintf (name, sizeof (name), "work_transfers/%u%s", cpu, suffix);
  155. syscnt_register (&pool->sc_transfers, name);
  156. pool->cpu = cpu;
  157. }
  158. pool->max_threads = work_pool_compute_max_threads (nr_cpus);
  159. pool->nr_threads = 0;
  160. pool->nr_available_threads = 0;
  161. list_init (&pool->available_threads);
  162. list_init (&pool->dead_threads);
  163. bitmap_zero (pool->bitmap, WORK_MAX_THREADS);
  164. uint32_t id = work_pool_alloc_id (pool);
  165. if (work_thread_create (pool, id) != 0)
  166. panic ("work: unable to create initial worker thread");
  167. }
  168. static struct work_pool*
  169. work_pool_cpu_select (int flags)
  170. {
  171. return ((flags & WORK_HIGHPRIO) ?
  172. cpu_local_ptr (work_pool_cpu_highprio) :
  173. cpu_local_ptr (work_pool_cpu_main));
  174. }
  175. static void
  176. work_pool_acquire (struct work_pool *pool, cpu_flags_t *flags)
  177. {
  178. if (pool->flags & WORK_PF_GLOBAL)
  179. spinlock_lock_intr_save (&pool->lock, flags);
  180. else
  181. thread_preempt_disable_intr_save (flags);
  182. }
  183. static void
  184. work_pool_release (struct work_pool *pool, cpu_flags_t flags)
  185. {
  186. if (pool->flags & WORK_PF_GLOBAL)
  187. spinlock_unlock_intr_restore (&pool->lock, flags);
  188. else
  189. thread_preempt_enable_intr_restore (flags);
  190. }
  191. static int
  192. work_pool_nr_works (const struct work_pool *pool)
  193. {
  194. return (work_queue_nr_works (&pool->queue0) +
  195. work_queue_nr_works (&pool->queue1));
  196. }
  197. static struct work*
  198. work_pool_pop_work (struct work_pool *pool)
  199. {
  200. if (!(pool->flags & WORK_PF_GLOBAL) &&
  201. work_queue_nr_works (&pool->queue1) != 0)
  202. return (work_queue_pop (&pool->queue1));
  203. return (work_queue_pop (&pool->queue0));
  204. }
  205. static void
  206. work_pool_wakeup_manager (struct work_pool *pool)
  207. {
  208. if (work_pool_nr_works (pool) && pool->manager)
  209. thread_wakeup (pool->manager->thread);
  210. }
  211. static void
  212. work_pool_shift_queues (struct work_pool *pool, struct work_queue *old_queue)
  213. {
  214. assert (!(pool->flags & WORK_PF_GLOBAL));
  215. work_queue_transfer (old_queue, &pool->queue1);
  216. work_queue_transfer (&pool->queue1, &pool->queue0);
  217. work_queue_init (&pool->queue0);
  218. if (work_queue_nr_works (old_queue))
  219. syscnt_inc (&pool->sc_transfers);
  220. }
  221. static void
  222. work_pool_push_work (struct work_pool *pool, struct work *work)
  223. {
  224. work_queue_push (&pool->queue0, work);
  225. work_pool_wakeup_manager (pool);
  226. }
  227. static void
  228. work_pool_concat_queue (struct work_pool *pool, struct work_queue *queue)
  229. {
  230. work_queue_concat (&pool->queue0, queue);
  231. work_pool_wakeup_manager (pool);
  232. }
  233. static void
  234. work_thread_destroy (struct work_thread *worker)
  235. {
  236. thread_join (worker->thread);
  237. kmem_cache_free (&work_thread_cache, worker);
  238. }
  239. static void
  240. work_process (void *arg)
  241. {
  242. struct work_thread *self = arg;
  243. struct work_pool *pool = self->pool;
  244. struct spinlock *lock = (pool->flags & WORK_PF_GLOBAL) ? &pool->lock : NULL;
  245. cpu_flags_t flags;
  246. work_pool_acquire (pool, &flags);
  247. while (1)
  248. {
  249. if (pool->manager != NULL)
  250. {
  251. list_insert_tail (&pool->available_threads, &self->node);
  252. ++pool->nr_available_threads;
  253. do
  254. thread_sleep (lock, pool, "work_spr");
  255. while (pool->manager);
  256. list_remove (&self->node);
  257. --pool->nr_available_threads;
  258. }
  259. if (!list_empty (&pool->dead_threads))
  260. {
  261. _Auto worker = list_first_entry (&pool->dead_threads,
  262. struct work_thread, node);
  263. list_remove (&worker->node);
  264. work_pool_release (pool, flags);
  265. uint32_t id = worker->id;
  266. work_thread_destroy (worker);
  267. /*
  268. * Release worker ID last so that, if the pool is full, no new
  269. * worker can be created unless all the resources of the worker
  270. * being destroyed have been freed. This is important to enforce
  271. * a strict boundary on the total amount of resources allocated
  272. * for a pool at any time.
  273. */
  274. work_pool_acquire (pool, &flags);
  275. work_pool_free_id (pool, id);
  276. continue;
  277. }
  278. if (!work_pool_nr_works (pool))
  279. {
  280. if (pool->nr_threads > WORK_THREADS_SPARE)
  281. break;
  282. pool->manager = self;
  283. do
  284. thread_sleep (lock, pool, "work_mgr");
  285. while (!work_pool_nr_works (pool));
  286. pool->manager = NULL;
  287. }
  288. _Auto work = work_pool_pop_work (pool);
  289. if (work_pool_nr_works (pool))
  290. {
  291. if (pool->nr_available_threads != 0)
  292. {
  293. _Auto worker = list_first_entry (&pool->available_threads,
  294. struct work_thread, node);
  295. thread_wakeup (worker->thread);
  296. }
  297. else if (pool->nr_threads < pool->max_threads)
  298. {
  299. uint32_t id = work_pool_alloc_id (pool);
  300. work_pool_release (pool, flags);
  301. int error = work_thread_create (pool, id);
  302. work_pool_acquire (pool, &flags);
  303. if (error)
  304. {
  305. work_pool_free_id (pool, id);
  306. log_warning ("work: unable to create worker thread");
  307. }
  308. }
  309. }
  310. work_pool_release (pool, flags);
  311. work->fn (work);
  312. work_pool_acquire (pool, &flags);
  313. }
  314. list_insert_tail (&pool->dead_threads, &self->node);
  315. work_pool_release (pool, flags);
  316. }
  317. static int
  318. work_thread_create (struct work_pool *pool, uint32_t id)
  319. {
  320. struct work_thread *worker = kmem_cache_alloc (&work_thread_cache);
  321. if (! worker)
  322. return (ENOMEM);
  323. worker->pool = pool;
  324. worker->id = id;
  325. const char *suffix;
  326. uint16_t priority;
  327. int error;
  328. if (pool->flags & WORK_PF_HIGHPRIO)
  329. {
  330. suffix = "h";
  331. priority = WORK_PRIO_HIGH;
  332. }
  333. else
  334. {
  335. suffix = "";
  336. priority = WORK_PRIO_NORMAL;
  337. }
  338. struct cpumap *cpumap;
  339. char name[THREAD_NAME_SIZE];
  340. if (pool->flags & WORK_PF_GLOBAL)
  341. {
  342. cpumap = NULL;
  343. snprintf (name, sizeof (name),
  344. THREAD_KERNEL_PREFIX "work_process/g:%u%s",
  345. worker->id, suffix);
  346. }
  347. else
  348. {
  349. error = cpumap_create (&cpumap);
  350. if (error)
  351. goto error_cpumap;
  352. uint32_t pool_id = work_pool_cpu_id (pool);
  353. cpumap_zero (cpumap);
  354. cpumap_set (cpumap, pool_id);
  355. snprintf (name, sizeof (name),
  356. THREAD_KERNEL_PREFIX "work_process/%u:%u%s",
  357. pool_id, worker->id, suffix);
  358. }
  359. struct thread_attr attr;
  360. thread_attr_init (&attr, name);
  361. thread_attr_set_priority (&attr, priority);
  362. if (cpumap)
  363. thread_attr_set_cpumap (&attr, cpumap);
  364. error = thread_create (&worker->thread, &attr, work_process, worker);
  365. if (cpumap)
  366. cpumap_destroy (cpumap);
  367. if (error)
  368. goto error_thread;
  369. return (0);
  370. error_thread:
  371. error_cpumap:
  372. kmem_cache_free (&work_thread_cache, worker);
  373. return (error);
  374. }
  375. static int __init
  376. work_bootstrap (void)
  377. {
  378. work_pool_init (cpu_local_ptr (work_pool_cpu_main));
  379. work_pool_init (cpu_local_ptr (work_pool_cpu_highprio));
  380. return (0);
  381. }
  382. INIT_OP_DEFINE (work_bootstrap,
  383. INIT_OP_DEP (cpu_setup, true),
  384. INIT_OP_DEP (spinlock_setup, true),
  385. INIT_OP_DEP (thread_bootstrap, true));
  386. static int __init
  387. work_setup (void)
  388. {
  389. kmem_cache_init (&work_thread_cache, "work_thread",
  390. sizeof (struct work_thread), 0, NULL, 0);
  391. for (uint32_t i = 1; i < cpu_count (); i++)
  392. {
  393. work_pool_init (percpu_ptr (work_pool_cpu_main, i));
  394. work_pool_init (percpu_ptr (work_pool_cpu_highprio, i));
  395. }
  396. work_pool_init (&work_pool_main);
  397. work_pool_init (&work_pool_highprio);
  398. for (uint32_t i = 0; i < cpu_count (); i++)
  399. {
  400. work_pool_build (percpu_ptr (work_pool_cpu_main, i), i, 0);
  401. work_pool_build (percpu_ptr (work_pool_cpu_highprio, i), i,
  402. WORK_PF_HIGHPRIO);
  403. }
  404. work_pool_build (&work_pool_main, WORK_INVALID_CPU, WORK_PF_GLOBAL);
  405. work_pool_build (&work_pool_highprio, WORK_INVALID_CPU,
  406. WORK_PF_GLOBAL | WORK_PF_HIGHPRIO);
  407. log_info ("work: threads per pool (per-cpu/global): %u/%u, spare: %u",
  408. percpu_var (work_pool_cpu_main.max_threads, 0),
  409. work_pool_main.max_threads, WORK_THREADS_SPARE);
  410. return (0);
  411. }
  412. INIT_OP_DEFINE (work_setup,
  413. INIT_OP_DEP (cpu_mp_probe, true),
  414. INIT_OP_DEP (cpumap_setup, true),
  415. INIT_OP_DEP (kmem_setup, true),
  416. INIT_OP_DEP (log_setup, true),
  417. INIT_OP_DEP (spinlock_setup, true),
  418. INIT_OP_DEP (syscnt_setup, true),
  419. INIT_OP_DEP (thread_setup, true),
  420. INIT_OP_DEP (work_bootstrap, true));
  421. void
  422. work_schedule (struct work *work, int flags)
  423. {
  424. THREAD_PIN_GUARD ();
  425. struct work_pool *pool = work_pool_cpu_select (flags);
  426. cpu_flags_t cpu_flags;
  427. work_pool_acquire (pool, &cpu_flags);
  428. work_pool_push_work (pool, work);
  429. work_pool_release (pool, cpu_flags);
  430. }
  431. void
  432. work_queue_schedule (struct work_queue *queue, int flags)
  433. {
  434. THREAD_PIN_GUARD ();
  435. struct work_pool *pool = work_pool_cpu_select (flags);
  436. cpu_flags_t cpu_flags;
  437. work_pool_acquire (pool, &cpu_flags);
  438. work_pool_concat_queue (pool, queue);
  439. work_pool_release (pool, cpu_flags);
  440. }
  441. void
  442. work_report_periodic_event (void)
  443. {
  444. assert (thread_check_intr_context ());
  445. struct work_queue queue, highprio_queue;
  446. work_pool_shift_queues (cpu_local_ptr (work_pool_cpu_main), &queue);
  447. work_pool_shift_queues (cpu_local_ptr (work_pool_cpu_highprio),
  448. &highprio_queue);
  449. if (work_queue_nr_works (&queue))
  450. {
  451. SPINLOCK_GUARD (&work_pool_main.lock);
  452. work_pool_concat_queue (&work_pool_main, &queue);
  453. }
  454. if (work_queue_nr_works (&highprio_queue))
  455. {
  456. SPINLOCK_GUARD (&work_pool_highprio.lock);
  457. work_pool_concat_queue (&work_pool_highprio, &highprio_queue);
  458. }
  459. }