fibers.scm 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. ;; Fibers: cooperative, event-driven user-space threads.
  2. ;;;; Copyright (C) 2016 Free Software Foundation, Inc.
  3. ;;;;
  4. ;;;; This library is free software; you can redistribute it and/or
  5. ;;;; modify it under the terms of the GNU Lesser General Public
  6. ;;;; License as published by the Free Software Foundation; either
  7. ;;;; version 3 of the License, or (at your option) any later version.
  8. ;;;;
  9. ;;;; This library is distributed in the hope that it will be useful,
  10. ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. ;;;; Lesser General Public License for more details.
  13. ;;;;
  14. ;;;; You should have received a copy of the GNU Lesser General Public
  15. ;;;; License along with this library; if not, write to the Free Software
  16. ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. ;;;;
  18. (define-module (fibers)
  19. #:use-module (ice-9 match)
  20. #:use-module (ice-9 atomic)
  21. #:use-module (fibers scheduler)
  22. #:use-module (fibers repl)
  23. #:use-module (fibers timers)
  24. #:use-module (fibers interrupts)
  25. #:use-module (ice-9 threads)
  26. #:use-module ((ice-9 ports internal)
  27. #:select (port-read-wait-fd port-write-wait-fd))
  28. #:use-module (ice-9 suspendable-ports)
  29. #:export (run-fibers spawn-fiber)
  30. #:re-export (sleep))
  31. (define (wait-for-readable port)
  32. (suspend-current-task
  33. (lambda (sched k)
  34. (schedule-task-when-fd-readable sched (port-read-wait-fd port) k))))
  35. (define (wait-for-writable port)
  36. (suspend-current-task
  37. (lambda (sched k)
  38. (schedule-task-when-fd-writable sched (port-write-wait-fd port) k))))
  39. (define-syntax-rule (with-affinity affinity exp ...)
  40. (let ((saved #f))
  41. (dynamic-wind
  42. (lambda ()
  43. (set! saved (getaffinity 0))
  44. (setaffinity 0 affinity))
  45. (lambda () exp ...)
  46. (lambda ()
  47. (setaffinity 0 saved)))))
  48. (define (%run-fibers scheduler hz finished? affinity)
  49. (with-affinity
  50. affinity
  51. (with-interrupts
  52. hz
  53. (let ((last-runcount 0))
  54. (lambda ()
  55. (let* ((runcount (scheduler-runcount scheduler))
  56. (res (eqv? runcount last-runcount)))
  57. (set! last-runcount runcount)
  58. res)))
  59. yield-current-task
  60. (lambda ()
  61. (run-scheduler scheduler finished?)))))
  62. (define (start-auxiliary-threads scheduler hz finished? affinities)
  63. (for-each (lambda (sched affinity)
  64. (call-with-new-thread
  65. (lambda ()
  66. (%run-fibers sched hz finished? affinity))))
  67. (scheduler-remote-peers scheduler) affinities))
  68. (define (stop-auxiliary-threads scheduler)
  69. (for-each
  70. (lambda (scheduler)
  71. (let ((thread (scheduler-kernel-thread scheduler)))
  72. (when thread
  73. (cancel-thread thread)
  74. (join-thread thread))))
  75. (scheduler-remote-peers scheduler)))
  76. (define (compute-affinities group-affinity parallelism)
  77. (define (each-thread-has-group-affinity)
  78. (make-list parallelism group-affinity))
  79. (define (one-thread-per-cpu)
  80. (let lp ((cpu 0))
  81. (match (bit-position #t group-affinity cpu)
  82. (#f '())
  83. (cpu (let ((affinity
  84. (make-bitvector (bitvector-length group-affinity) #f)))
  85. (bitvector-set! affinity cpu #t)
  86. (cons affinity (lp (1+ cpu))))))))
  87. (let ((cpu-count (bit-count #t group-affinity)))
  88. (if (eq? parallelism cpu-count)
  89. (one-thread-per-cpu)
  90. (each-thread-has-group-affinity))))
  91. (define* (run-fibers #:optional (init #f)
  92. #:key (hz 100) (scheduler #f)
  93. (parallelism (current-processor-count))
  94. (cpus (getaffinity 0))
  95. (install-suspendable-ports? #t)
  96. (drain? #f))
  97. (when install-suspendable-ports? (install-suspendable-ports!))
  98. (cond
  99. (scheduler
  100. (let ((finished? (lambda () #f)))
  101. (when init (spawn-fiber init scheduler))
  102. (%run-fibers scheduler hz finished? cpus)))
  103. (else
  104. (let* ((scheduler (make-scheduler #:parallelism parallelism))
  105. (ret (make-atomic-box #f))
  106. (finished? (lambda ()
  107. (and (atomic-box-ref ret)
  108. (or (not drain?)
  109. (not (scheduler-work-pending? scheduler))))))
  110. (affinities (compute-affinities cpus parallelism)))
  111. (unless init
  112. (error "run-fibers requires initial fiber thunk when creating sched"))
  113. (spawn-fiber (lambda ()
  114. (call-with-values init
  115. (lambda vals (atomic-box-set! ret vals)))
  116. ;; Could be that this fiber was migrated away.
  117. ;; Make sure to wake up the main scheduler.
  118. (spawn-fiber (lambda () #t) scheduler))
  119. scheduler)
  120. (match affinities
  121. ((affinity . affinities)
  122. (dynamic-wind
  123. (lambda ()
  124. (start-auxiliary-threads scheduler hz finished? affinities))
  125. (lambda ()
  126. (%run-fibers scheduler hz finished? affinity))
  127. (lambda ()
  128. (stop-auxiliary-threads scheduler)))))
  129. (destroy-scheduler scheduler)
  130. (apply values (atomic-box-ref ret))))))
  131. (define* (spawn-fiber thunk #:optional scheduler #:key parallel?)
  132. "Spawn a new fiber which will start by invoking @var{thunk}.
  133. The fiber will be scheduled on the next turn. @var{thunk} will run
  134. with a copy of the current dynamic state, isolating fluid and
  135. parameter mutations to the fiber."
  136. (define (capture-dynamic-state thunk)
  137. (let ((dynamic-state (current-dynamic-state)))
  138. (lambda ()
  139. (with-dynamic-state dynamic-state thunk))))
  140. (define (create-fiber sched thunk)
  141. (schedule-task sched
  142. (capture-dynamic-state thunk)))
  143. (cond
  144. (scheduler
  145. ;; When a scheduler is passed explicitly, it could be there is no
  146. ;; current fiber; in that case the dynamic state probably doesn't
  147. ;; have the right right current-read-waiter /
  148. ;; current-write-waiter, so wrap the thunk.
  149. (create-fiber scheduler
  150. (lambda ()
  151. (current-read-waiter wait-for-readable)
  152. (current-write-waiter wait-for-writable)
  153. (thunk))))
  154. ((current-scheduler)
  155. => (lambda (sched)
  156. (create-fiber (if parallel?
  157. (choose-parallel-scheduler sched)
  158. sched)
  159. thunk)))
  160. (else
  161. (error "No scheduler current; call within run-fibers instead"))))