work.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. /*
  2. * Copyright (c) 2013-2014 Richard Braun.
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation, either version 3 of the License, or
  7. * (at your option) any later version.
  8. *
  9. * This program is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. * GNU General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU General Public License
  15. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. */
  17. #include <assert.h>
  18. #include <errno.h>
  19. #include <stdalign.h>
  20. #include <stddef.h>
  21. #include <stdio.h>
  22. #include <kern/bitmap.h>
  23. #include <kern/error.h>
  24. #include <kern/init.h>
  25. #include <kern/kmem.h>
  26. #include <kern/list.h>
  27. #include <kern/log.h>
  28. #include <kern/macros.h>
  29. #include <kern/panic.h>
  30. #include <kern/percpu.h>
  31. #include <kern/spinlock.h>
  32. #include <kern/syscnt.h>
  33. #include <kern/thread.h>
  34. #include <kern/work.h>
  35. #include <machine/cpu.h>
  36. #define WORK_PRIO_NORMAL THREAD_SCHED_FS_PRIO_DEFAULT
  37. #define WORK_PRIO_HIGH THREAD_SCHED_FS_PRIO_MAX
  38. #define WORK_INVALID_CPU ((unsigned int)-1)
  39. // Keep at least that many threads alive when a work pool is idle.
  40. #define WORK_THREADS_SPARE 4
  41. /*
  42. * When computing the maximum number of worker threads, start with multiplying
  43. * the number of processors by the ratio below. If the result is greater than
  44. * the threshold, retry by decreasing the ratio until either the result is
  45. * less than the threshold or the ratio is 1.
  46. */
  47. #define WORK_THREADS_RATIO 4
  48. #define WORK_THREADS_THRESHOLD 512
  49. #define WORK_MAX_THREADS MAX (CONFIG_MAX_CPUS, WORK_THREADS_THRESHOLD)
  50. // Work pool flags.
  51. #define WORK_PF_GLOBAL 0x1 // System-wide work queue.
  52. #define WORK_PF_HIGHPRIO 0x2 // High priority worker threads.
  53. struct work_thread
  54. {
  55. struct list node;
  56. struct thread *thread;
  57. struct work_pool *pool;
  58. uint32_t id;
  59. };
  60. /*
  61. * Pool of threads and works.
  62. *
  63. * Interrupts must be disabled when accessing a work pool. Holding the
  64. * lock is required for global pools only, whereas exclusive access on
  65. * per-processor pools is achieved by disabling preemption.
  66. *
  67. * There are two internal queues of pending works. When first scheduling
  68. * a work, it is inserted into queue0. After a periodic event, works still
  69. * present in queue0 are moved to queue1. If these works are still present
  70. * in queue1 at the next periodic event, it means they couldn't be processed
  71. * for a complete period between two periodic events, at which point it is
  72. * assumed that processing works on the same processor they were queued on
  73. * becomes less relevant. As a result, periodic events also trigger the
  74. * transfer of works from queue1 to the matching global pool. Global pools
  75. * only use one queue.
  76. */
  77. struct work_pool
  78. {
  79. __cacheline_aligned struct spinlock lock;
  80. int flags;
  81. struct work_queue queue0;
  82. struct work_queue queue1;
  83. struct work_thread *manager;
  84. struct syscnt sc_transfers;
  85. uint32_t cpu;
  86. uint32_t max_threads;
  87. uint32_t nr_threads;
  88. uint32_t nr_available_threads;
  89. struct list available_threads;
  90. struct list dead_threads;
  91. BITMAP_DECLARE (bitmap, WORK_MAX_THREADS);
  92. };
  93. static int work_thread_create (struct work_pool *pool, uint32_t id);
  94. static struct work_pool work_pool_cpu_main __percpu;
  95. static struct work_pool work_pool_cpu_highprio __percpu;
  96. static struct work_pool work_pool_main;
  97. static struct work_pool work_pool_highprio;
  98. static struct kmem_cache work_thread_cache;
  99. static unsigned int
  100. work_pool_alloc_id (struct work_pool *pool)
  101. {
  102. assert (pool->nr_threads < pool->max_threads);
  103. ++pool->nr_threads;
  104. int bit = bitmap_find_first_zero (pool->bitmap, pool->max_threads);
  105. assert (bit >= 0);
  106. bitmap_set (pool->bitmap, bit);
  107. return (bit);
  108. }
  109. static void
  110. work_pool_free_id (struct work_pool *pool, unsigned int id)
  111. {
  112. assert (pool->nr_threads != 0);
  113. --pool->nr_threads;
  114. bitmap_clear (pool->bitmap, id);
  115. }
  116. static unsigned int
  117. work_pool_cpu_id (const struct work_pool *pool)
  118. {
  119. assert (!(pool->flags & WORK_PF_GLOBAL));
  120. return (pool->cpu);
  121. }
  122. static unsigned int
  123. work_pool_compute_max_threads (uint32_t nr_cpus)
  124. {
  125. uint32_t ratio = WORK_THREADS_RATIO, max_threads = nr_cpus * ratio;
  126. for (; ratio > 1 && max_threads > WORK_THREADS_THRESHOLD;
  127. max_threads = nr_cpus * (--ratio))
  128. ;
  129. assert (max_threads);
  130. assert (max_threads <= WORK_MAX_THREADS);
  131. return (max_threads);
  132. }
  133. static void __init
  134. work_pool_init (struct work_pool *pool)
  135. {
  136. spinlock_init (&pool->lock);
  137. work_queue_init (&pool->queue0);
  138. work_queue_init (&pool->queue1);
  139. pool->manager = NULL;
  140. }
  141. static void __init
  142. work_pool_build (struct work_pool *pool, uint32_t cpu, int flags)
  143. {
  144. pool->flags = flags;
  145. uint32_t nr_cpus = 1;
  146. if (flags & WORK_PF_GLOBAL)
  147. {
  148. nr_cpus = cpu_count ();
  149. pool->cpu = WORK_INVALID_CPU;
  150. }
  151. else
  152. {
  153. const char *suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : "";
  154. char name[SYSCNT_NAME_SIZE];
  155. snprintf (name, sizeof (name), "work_transfers/%u%s", cpu, suffix);
  156. syscnt_register (&pool->sc_transfers, name);
  157. pool->cpu = cpu;
  158. }
  159. pool->max_threads = work_pool_compute_max_threads (nr_cpus);
  160. pool->nr_threads = 0;
  161. pool->nr_available_threads = 0;
  162. list_init (&pool->available_threads);
  163. list_init (&pool->dead_threads);
  164. bitmap_zero (pool->bitmap, WORK_MAX_THREADS);
  165. uint32_t id = work_pool_alloc_id (pool);
  166. if (work_thread_create (pool, id) != 0)
  167. panic ("work: unable to create initial worker thread");
  168. }
  169. static struct work_pool*
  170. work_pool_cpu_select (int flags)
  171. {
  172. return ((flags & WORK_HIGHPRIO) ?
  173. cpu_local_ptr (work_pool_cpu_highprio) :
  174. cpu_local_ptr (work_pool_cpu_main));
  175. }
  176. static void
  177. work_pool_acquire (struct work_pool *pool, cpu_flags_t *flags)
  178. {
  179. if (pool->flags & WORK_PF_GLOBAL)
  180. spinlock_lock_intr_save (&pool->lock, flags);
  181. else
  182. thread_preempt_disable_intr_save (flags);
  183. }
  184. static void
  185. work_pool_release (struct work_pool *pool, cpu_flags_t flags)
  186. {
  187. if (pool->flags & WORK_PF_GLOBAL)
  188. spinlock_unlock_intr_restore (&pool->lock, flags);
  189. else
  190. thread_preempt_enable_intr_restore (flags);
  191. }
  192. static int
  193. work_pool_nr_works (const struct work_pool *pool)
  194. {
  195. return (work_queue_nr_works (&pool->queue0) +
  196. work_queue_nr_works (&pool->queue1));
  197. }
  198. static struct work*
  199. work_pool_pop_work (struct work_pool *pool)
  200. {
  201. if (!(pool->flags & WORK_PF_GLOBAL) &&
  202. work_queue_nr_works (&pool->queue1) != 0)
  203. return (work_queue_pop (&pool->queue1));
  204. return (work_queue_pop (&pool->queue0));
  205. }
  206. static void
  207. work_pool_wakeup_manager (struct work_pool *pool)
  208. {
  209. if (work_pool_nr_works (pool) && pool->manager)
  210. thread_wakeup (pool->manager->thread);
  211. }
  212. static void
  213. work_pool_shift_queues (struct work_pool *pool, struct work_queue *old_queue)
  214. {
  215. assert (!(pool->flags & WORK_PF_GLOBAL));
  216. work_queue_transfer (old_queue, &pool->queue1);
  217. work_queue_transfer (&pool->queue1, &pool->queue0);
  218. work_queue_init (&pool->queue0);
  219. if (work_queue_nr_works (old_queue))
  220. syscnt_inc (&pool->sc_transfers);
  221. }
  222. static void
  223. work_pool_push_work (struct work_pool *pool, struct work *work)
  224. {
  225. work_queue_push (&pool->queue0, work);
  226. work_pool_wakeup_manager (pool);
  227. }
  228. static void
  229. work_pool_concat_queue (struct work_pool *pool, struct work_queue *queue)
  230. {
  231. work_queue_concat (&pool->queue0, queue);
  232. work_pool_wakeup_manager (pool);
  233. }
  234. static void
  235. work_thread_destroy (struct work_thread *worker)
  236. {
  237. thread_join (worker->thread);
  238. kmem_cache_free (&work_thread_cache, worker);
  239. }
  240. static void
  241. work_process (void *arg)
  242. {
  243. struct work_thread *self = arg;
  244. struct work_pool *pool = self->pool;
  245. struct spinlock *lock = (pool->flags & WORK_PF_GLOBAL) ? &pool->lock : NULL;
  246. cpu_flags_t flags;
  247. work_pool_acquire (pool, &flags);
  248. while (1)
  249. {
  250. if (pool->manager != NULL)
  251. {
  252. list_insert_tail (&pool->available_threads, &self->node);
  253. ++pool->nr_available_threads;
  254. do
  255. thread_sleep (lock, pool, "work_spr");
  256. while (pool->manager);
  257. list_remove (&self->node);
  258. --pool->nr_available_threads;
  259. }
  260. if (!list_empty (&pool->dead_threads))
  261. {
  262. _Auto worker = list_first_entry (&pool->dead_threads,
  263. struct work_thread, node);
  264. list_remove (&worker->node);
  265. work_pool_release (pool, flags);
  266. uint32_t id = worker->id;
  267. work_thread_destroy (worker);
  268. /*
  269. * Release worker ID last so that, if the pool is full, no new
  270. * worker can be created unless all the resources of the worker
  271. * being destroyed have been freed. This is important to enforce
  272. * a strict boundary on the total amount of resources allocated
  273. * for a pool at any time.
  274. */
  275. work_pool_acquire (pool, &flags);
  276. work_pool_free_id (pool, id);
  277. continue;
  278. }
  279. if (!work_pool_nr_works (pool))
  280. {
  281. if (pool->nr_threads > WORK_THREADS_SPARE)
  282. break;
  283. pool->manager = self;
  284. do
  285. thread_sleep (lock, pool, "work_mgr");
  286. while (!work_pool_nr_works (pool));
  287. pool->manager = NULL;
  288. }
  289. _Auto work = work_pool_pop_work (pool);
  290. if (work_pool_nr_works (pool))
  291. {
  292. if (pool->nr_available_threads != 0)
  293. {
  294. _Auto worker = list_first_entry (&pool->available_threads,
  295. struct work_thread, node);
  296. thread_wakeup (worker->thread);
  297. }
  298. else if (pool->nr_threads < pool->max_threads)
  299. {
  300. uint32_t id = work_pool_alloc_id (pool);
  301. work_pool_release (pool, flags);
  302. int error = work_thread_create (pool, id);
  303. work_pool_acquire (pool, &flags);
  304. if (error)
  305. {
  306. work_pool_free_id (pool, id);
  307. log_warning ("work: unable to create worker thread");
  308. }
  309. }
  310. }
  311. work_pool_release (pool, flags);
  312. work->fn (work);
  313. work_pool_acquire (pool, &flags);
  314. }
  315. list_insert_tail (&pool->dead_threads, &self->node);
  316. work_pool_release (pool, flags);
  317. }
  318. static int
  319. work_thread_create (struct work_pool *pool, uint32_t id)
  320. {
  321. struct work_thread *worker = kmem_cache_alloc (&work_thread_cache);
  322. if (! worker)
  323. return (ENOMEM);
  324. worker->pool = pool;
  325. worker->id = id;
  326. const char *suffix;
  327. uint16_t priority;
  328. int error;
  329. if (pool->flags & WORK_PF_HIGHPRIO)
  330. {
  331. suffix = "h";
  332. priority = WORK_PRIO_HIGH;
  333. }
  334. else
  335. {
  336. suffix = "";
  337. priority = WORK_PRIO_NORMAL;
  338. }
  339. struct cpumap *cpumap;
  340. char name[THREAD_NAME_SIZE];
  341. if (pool->flags & WORK_PF_GLOBAL)
  342. {
  343. cpumap = NULL;
  344. snprintf (name, sizeof (name),
  345. THREAD_KERNEL_PREFIX "work_process/g:%u%s",
  346. worker->id, suffix);
  347. }
  348. else
  349. {
  350. error = cpumap_create (&cpumap);
  351. if (error)
  352. goto error_cpumap;
  353. uint32_t pool_id = work_pool_cpu_id (pool);
  354. cpumap_zero (cpumap);
  355. cpumap_set (cpumap, pool_id);
  356. snprintf (name, sizeof (name),
  357. THREAD_KERNEL_PREFIX "work_process/%u:%u%s",
  358. pool_id, worker->id, suffix);
  359. }
  360. struct thread_attr attr;
  361. thread_attr_init (&attr, name);
  362. thread_attr_set_priority (&attr, priority);
  363. if (cpumap)
  364. thread_attr_set_cpumap (&attr, cpumap);
  365. error = thread_create (&worker->thread, &attr, work_process, worker);
  366. if (cpumap)
  367. cpumap_destroy (cpumap);
  368. if (error)
  369. goto error_thread;
  370. return (0);
  371. error_thread:
  372. error_cpumap:
  373. kmem_cache_free (&work_thread_cache, worker);
  374. return (error);
  375. }
  376. static int __init
  377. work_bootstrap (void)
  378. {
  379. work_pool_init (cpu_local_ptr (work_pool_cpu_main));
  380. work_pool_init (cpu_local_ptr (work_pool_cpu_highprio));
  381. return (0);
  382. }
  383. INIT_OP_DEFINE (work_bootstrap,
  384. INIT_OP_DEP (cpu_setup, true),
  385. INIT_OP_DEP (spinlock_setup, true),
  386. INIT_OP_DEP (thread_bootstrap, true));
  387. static int __init
  388. work_setup (void)
  389. {
  390. kmem_cache_init (&work_thread_cache, "work_thread",
  391. sizeof (struct work_thread), 0, NULL, 0);
  392. for (uint32_t i = 1; i < cpu_count (); i++)
  393. {
  394. work_pool_init (percpu_ptr (work_pool_cpu_main, i));
  395. work_pool_init (percpu_ptr (work_pool_cpu_highprio, i));
  396. }
  397. work_pool_init (&work_pool_main);
  398. work_pool_init (&work_pool_highprio);
  399. for (uint32_t i = 0; i < cpu_count (); i++)
  400. {
  401. work_pool_build (percpu_ptr (work_pool_cpu_main, i), i, 0);
  402. work_pool_build (percpu_ptr (work_pool_cpu_highprio, i), i,
  403. WORK_PF_HIGHPRIO);
  404. }
  405. work_pool_build (&work_pool_main, WORK_INVALID_CPU, WORK_PF_GLOBAL);
  406. work_pool_build (&work_pool_highprio, WORK_INVALID_CPU,
  407. WORK_PF_GLOBAL | WORK_PF_HIGHPRIO);
  408. log_info ("work: threads per pool (per-cpu/global): %u/%u, spare: %u",
  409. percpu_var (work_pool_cpu_main.max_threads, 0),
  410. work_pool_main.max_threads, WORK_THREADS_SPARE);
  411. return (0);
  412. }
  413. INIT_OP_DEFINE (work_setup,
  414. INIT_OP_DEP (cpu_mp_probe, true),
  415. INIT_OP_DEP (cpumap_setup, true),
  416. INIT_OP_DEP (kmem_setup, true),
  417. INIT_OP_DEP (log_setup, true),
  418. INIT_OP_DEP (spinlock_setup, true),
  419. INIT_OP_DEP (syscnt_setup, true),
  420. INIT_OP_DEP (thread_setup, true),
  421. INIT_OP_DEP (work_bootstrap, true));
  422. void
  423. work_schedule (struct work *work, int flags)
  424. {
  425. THREAD_PIN_GUARD ();
  426. struct work_pool *pool = work_pool_cpu_select (flags);
  427. cpu_flags_t cpu_flags;
  428. work_pool_acquire (pool, &cpu_flags);
  429. work_pool_push_work (pool, work);
  430. work_pool_release (pool, cpu_flags);
  431. }
  432. void
  433. work_queue_schedule (struct work_queue *queue, int flags)
  434. {
  435. THREAD_PIN_GUARD ();
  436. struct work_pool *pool = work_pool_cpu_select (flags);
  437. cpu_flags_t cpu_flags;
  438. work_pool_acquire (pool, &cpu_flags);
  439. work_pool_concat_queue (pool, queue);
  440. work_pool_release (pool, cpu_flags);
  441. }
  442. void
  443. work_report_periodic_event (void)
  444. {
  445. assert (thread_check_intr_context ());
  446. struct work_queue queue, highprio_queue;
  447. work_pool_shift_queues (cpu_local_ptr (work_pool_cpu_main), &queue);
  448. work_pool_shift_queues (cpu_local_ptr (work_pool_cpu_highprio),
  449. &highprio_queue);
  450. if (work_queue_nr_works (&queue))
  451. {
  452. SPINLOCK_GUARD (&work_pool_main.lock);
  453. work_pool_concat_queue (&work_pool_main, &queue);
  454. }
  455. if (work_queue_nr_works (&highprio_queue))
  456. {
  457. SPINLOCK_GUARD (&work_pool_highprio.lock);
  458. work_pool_concat_queue (&work_pool_highprio, &highprio_queue);
  459. }
  460. }