scheduler.scm 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. ;; Fibers: cooperative, event-driven user-space threads.
  2. ;;;; Copyright (C) 2016 Free Software Foundation, Inc.
  3. ;;;;
  4. ;;;; This library is free software; you can redistribute it and/or
  5. ;;;; modify it under the terms of the GNU Lesser General Public
  6. ;;;; License as published by the Free Software Foundation; either
  7. ;;;; version 3 of the License, or (at your option) any later version.
  8. ;;;;
  9. ;;;; This library is distributed in the hope that it will be useful,
  10. ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. ;;;; Lesser General Public License for more details.
  13. ;;;;
  14. ;;;; You should have received a copy of the GNU Lesser General Public
  15. ;;;; License along with this library; if not, write to the Free Software
  16. ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. ;;;;
  18. (define-module (fibers scheduler)
  19. #:use-module (srfi srfi-9)
  20. #:use-module (fibers stack)
  21. #:use-module (fibers epoll)
  22. #:use-module (fibers psq)
  23. #:use-module (ice-9 atomic)
  24. #:use-module (ice-9 control)
  25. #:use-module (ice-9 match)
  26. #:use-module (ice-9 fdes-finalizers)
  27. #:use-module ((ice-9 threads) #:select (current-thread))
  28. #:export (;; Low-level interface: schedulers and tasks.
  29. make-scheduler
  30. (current-scheduler/public . current-scheduler)
  31. scheduler-runcount
  32. (scheduler-kernel-thread/public . scheduler-kernel-thread)
  33. scheduler-remote-peers
  34. scheduler-work-pending?
  35. choose-parallel-scheduler
  36. run-scheduler
  37. destroy-scheduler
  38. schedule-task
  39. schedule-task-when-fd-readable
  40. schedule-task-when-fd-writable
  41. schedule-task-at-time
  42. suspend-current-task
  43. yield-current-task))
  44. (define-record-type <scheduler>
  45. (%make-scheduler epfd runcount-box prompt-tag
  46. next-runqueue current-runqueue
  47. fd-waiters timers kernel-thread
  48. remote-peers choose-parallel-scheduler)
  49. scheduler?
  50. (epfd scheduler-epfd)
  51. ;; atomic variable of uint32
  52. (runcount-box scheduler-runcount-box)
  53. (prompt-tag scheduler-prompt-tag)
  54. ;; atomic stack of tasks to run next turn (reverse order)
  55. (next-runqueue scheduler-next-runqueue)
  56. ;; atomic stack of tasks to run this turn
  57. (current-runqueue scheduler-current-runqueue)
  58. ;; fd -> (total-events (events . task) ...)
  59. (fd-waiters scheduler-fd-waiters)
  60. ;; PSQ of expiry -> task
  61. (timers scheduler-timers set-scheduler-timers!)
  62. ;; atomic parameter of thread
  63. (kernel-thread scheduler-kernel-thread)
  64. ;; list of sched
  65. (remote-peers scheduler-remote-peers set-scheduler-remote-peers!)
  66. ;; () -> sched
  67. (choose-parallel-scheduler scheduler-choose-parallel-scheduler
  68. set-scheduler-choose-parallel-scheduler!))
  69. (define (make-atomic-parameter init)
  70. (let ((box (make-atomic-box init)))
  71. (case-lambda
  72. (() (atomic-box-ref box))
  73. ((new)
  74. (if (eq? new init)
  75. (atomic-box-set! box new)
  76. (let ((prev (atomic-box-compare-and-swap! box init new)))
  77. (unless (eq? prev init)
  78. (error "owned by other thread" prev))))))))
  79. (define (shuffle l)
  80. (map cdr (sort (map (lambda (x) (cons (random 1.0) x)) l)
  81. (lambda (a b) (< (car a) (car b))))))
  82. (define (make-selector items)
  83. (let ((items (list->vector (shuffle items))))
  84. (match (vector-length items)
  85. (0 (lambda () #f))
  86. (1 (let ((item (vector-ref items 0))) (lambda () item)))
  87. (n (let ((idx 0))
  88. (lambda ()
  89. (let ((item (vector-ref items idx)))
  90. (set! idx (let ((idx (1+ idx)))
  91. (if (= idx (vector-length items)) 0 idx)))
  92. item)))))))
  93. (define* (make-scheduler #:key parallelism
  94. (prompt-tag (make-prompt-tag "fibers")))
  95. "Make a new scheduler in which to run fibers."
  96. (let ((epfd (epoll-create))
  97. (runcount-box (make-atomic-box 0))
  98. (next-runqueue (make-empty-stack))
  99. (current-runqueue (make-empty-stack))
  100. (fd-waiters (make-hash-table))
  101. (timers (make-psq (match-lambda*
  102. (((t1 . c1) (t2 . c2)) (< t1 t2)))
  103. <))
  104. (kernel-thread (make-atomic-parameter #f)))
  105. (let* ((sched (%make-scheduler epfd runcount-box prompt-tag
  106. next-runqueue current-runqueue
  107. fd-waiters timers kernel-thread
  108. #f #f))
  109. (all-scheds
  110. (cons sched
  111. (if parallelism
  112. (map (lambda (_)
  113. (make-scheduler #:prompt-tag prompt-tag))
  114. (iota (1- parallelism)))
  115. '()))))
  116. (for-each
  117. (lambda (sched)
  118. (let ((choose! (make-selector all-scheds)))
  119. (set-scheduler-remote-peers! sched (delq sched all-scheds))
  120. (set-scheduler-choose-parallel-scheduler! sched choose!)))
  121. all-scheds)
  122. sched)))
  123. (define current-scheduler (fluid->parameter (make-thread-local-fluid #f)))
  124. (define (current-scheduler/public)
  125. "Return the current scheduler, or @code{#f} if no scheduler is current."
  126. (current-scheduler))
  127. (define-syntax-rule (with-scheduler scheduler body ...)
  128. "Evaluate @code{(begin @var{body} ...)} in an environment in which
  129. @var{scheduler} is bound to the current kernel thread and
  130. @code{current-scheduler} is bound to @var{scheduler}. Signal an error
  131. if @var{scheduler} is already running in some other kernel thread."
  132. (let ((sched scheduler))
  133. (dynamic-wind (lambda ()
  134. ((scheduler-kernel-thread sched) (current-thread)))
  135. (lambda ()
  136. (parameterize ((current-scheduler sched))
  137. body ...))
  138. (lambda ()
  139. ((scheduler-kernel-thread sched) #f)))))
  140. (define (scheduler-runcount sched)
  141. "Return the number of tasks that have been run on @var{sched} since
  142. it was started, modulo 2@sup{32}."
  143. (atomic-box-ref (scheduler-runcount-box sched)))
  144. (define (scheduler-kernel-thread/public sched)
  145. "Return the kernel thread on which @var{sched} is running, or
  146. @code{#f} if @var{sched} is not running."
  147. ((scheduler-kernel-thread sched)))
  148. (define (choose-parallel-scheduler sched)
  149. ((scheduler-choose-parallel-scheduler sched)))
  150. (define-inlinable (schedule-task/no-wakeup sched task)
  151. (stack-push! (scheduler-next-runqueue sched) task))
  152. (define (schedule-task sched task)
  153. "Add the task @var{task} to the run queue of the scheduler
  154. @var{sched}. On the next turn, @var{sched} will invoke @var{task}
  155. with no arguments.
  156. This function is thread-safe even if @var{sched} is running on a
  157. remote kernel thread."
  158. (schedule-task/no-wakeup sched task)
  159. (unless (eq? ((scheduler-kernel-thread sched)) (current-thread))
  160. (epoll-wake! (scheduler-epfd sched)))
  161. (values))
  162. (define (schedule-tasks-for-active-fd fd revents sched)
  163. (match (hashv-ref (scheduler-fd-waiters sched) fd)
  164. (#f (warn "scheduler for unknown fd" fd))
  165. ((and events+waiters (active-events . waiters))
  166. ;; First, clear the active status, as the EPOLLONESHOT has
  167. ;; deactivated our entry in the epoll set.
  168. (set-car! events+waiters #f)
  169. (set-cdr! events+waiters '())
  170. (unless (zero? (logand revents EPOLLERR))
  171. (hashv-remove! (scheduler-fd-waiters sched) fd))
  172. ;; Now resume or re-schedule waiters, as appropriate.
  173. (let lp ((waiters waiters))
  174. (match waiters
  175. (() #f)
  176. (((events . task) . waiters)
  177. (if (zero? (logand revents (logior events EPOLLERR)))
  178. ;; Re-schedule.
  179. (schedule-task-when-fd-active sched fd events task)
  180. ;; Resume.
  181. (schedule-task/no-wakeup sched task))
  182. (lp waiters)))))))
  183. (define (schedule-tasks-for-expired-timers sched)
  184. ;; Schedule expired timer tasks in the order that they expired.
  185. (let ((now (get-internal-real-time)))
  186. (let expire-timers ((timers (scheduler-timers sched)))
  187. (cond
  188. ((or (psq-empty? timers)
  189. (< now (car (psq-min timers))))
  190. (set-scheduler-timers! sched timers))
  191. (else
  192. (call-with-values (lambda () (psq-pop timers))
  193. (match-lambda*
  194. (((_ . task) timers)
  195. (schedule-task/no-wakeup sched task)
  196. (expire-timers timers)))))))))
  197. (define (schedule-tasks-for-next-turn sched)
  198. ;; Called when all tasks from the current turn have been run.
  199. ;; Note that there may be tasks already scheduled for the next
  200. ;; turn; one way this can happen is if a fiber suspended itself
  201. ;; because it was blocked on a channel, but then another fiber woke
  202. ;; it up, or if a remote thread scheduled a fiber on this scheduler.
  203. ;; In any case, check the kernel to see if any of the fd's that we
  204. ;; are interested in are active, and in that case schedule their
  205. ;; corresponding tasks. Also run any timers that have timed out.
  206. (define (timers-expiry timers)
  207. (and (not (psq-empty? timers))
  208. (match (psq-min timers)
  209. ((expiry . task)
  210. expiry))))
  211. (define (update-expiry expiry)
  212. ;; If there are pending tasks, cause epoll to return
  213. ;; immediately.
  214. (if (stack-empty? (scheduler-next-runqueue sched))
  215. expiry
  216. 0))
  217. (epoll (scheduler-epfd sched)
  218. #:expiry (timers-expiry (scheduler-timers sched))
  219. #:update-expiry update-expiry
  220. #:folder (lambda (fd revents sched)
  221. (schedule-tasks-for-active-fd fd revents sched)
  222. sched)
  223. #:seed sched)
  224. (schedule-tasks-for-expired-timers sched))
  225. (define (work-stealer sched)
  226. "Steal some work from a random scheduler in the vector
  227. @var{schedulers}. Return a task, or @code{#f} if no work could be
  228. stolen."
  229. (let ((selector (make-selector (scheduler-remote-peers sched))))
  230. (lambda ()
  231. (let ((peer (selector)))
  232. (and peer
  233. (stack-pop! (scheduler-current-runqueue peer) #f))))))
  234. (define (scheduler-work-pending? sched)
  235. "Return @code{#t} if @var{sched} has any work pending: any tasks or
  236. any pending timeouts."
  237. (not (and (psq-empty? (scheduler-timers sched))
  238. (stack-empty? (scheduler-current-runqueue sched))
  239. (stack-empty? (scheduler-next-runqueue sched)))))
  240. (define* (run-scheduler sched finished?)
  241. "Run @var{sched} until calling @code{finished?} returns a true
  242. value. Return zero values."
  243. (let ((tag (scheduler-prompt-tag sched))
  244. (runcount-box (scheduler-runcount-box sched))
  245. (next (scheduler-next-runqueue sched))
  246. (cur (scheduler-current-runqueue sched))
  247. (steal-work! (work-stealer sched)))
  248. (define (run-task task)
  249. (atomic-box-set! runcount-box
  250. (logand (1+ (atomic-box-ref runcount-box)) #xffffFFFF))
  251. (call-with-prompt tag
  252. task
  253. (lambda (k after-suspend)
  254. (after-suspend sched k))))
  255. (define (next-task)
  256. (match (stack-pop! cur #f)
  257. (#f
  258. (when (stack-empty? next)
  259. ;; Both current and next runqueues are empty; steal a
  260. ;; little bit of work from a remote scheduler if we
  261. ;; can. Run it directly instead of pushing onto a
  262. ;; queue to avoid double stealing.
  263. (let ((task (steal-work!)))
  264. (when task
  265. (run-task task))))
  266. (next-turn))
  267. (task
  268. (run-task task)
  269. (next-task))))
  270. (define (next-turn)
  271. (unless (finished?)
  272. (schedule-tasks-for-next-turn sched)
  273. (stack-push-list! cur (reverse (stack-pop-all! next)))
  274. (next-task)))
  275. (define (run-scheduler/error-handling)
  276. (catch #t
  277. next-task
  278. (lambda _ (run-scheduler/error-handling))
  279. (let ((err (current-error-port)))
  280. (lambda (key . args)
  281. (false-if-exception
  282. (let ((stack (make-stack #t 4 tag)))
  283. (format err "Uncaught exception in task:\n")
  284. ;; FIXME: Guile's display-backtrace isn't respecting
  285. ;; stack narrowing; manually passing stack-length as
  286. ;; depth is a workaround.
  287. (display-backtrace stack err 0 (stack-length stack))
  288. (print-exception err (stack-ref stack 0)
  289. key args)))))))
  290. (with-scheduler sched (run-scheduler/error-handling))))
  291. (define (destroy-scheduler sched)
  292. "Release any resources associated with @var{sched}."
  293. (epoll-destroy (scheduler-epfd sched)))
  294. (define (schedule-task-when-fd-active sched fd events task)
  295. "Arrange for @var{sched} to schedule @var{task} when the file
  296. descriptor @var{fd} becomes active with any of the given @var{events},
  297. expressed as an epoll bitfield."
  298. (let ((fd-waiters (hashv-ref (scheduler-fd-waiters sched) fd)))
  299. (match fd-waiters
  300. ((active-events . waiters)
  301. (set-cdr! fd-waiters (acons events task waiters))
  302. (unless (and active-events
  303. (= (logand events active-events) events))
  304. (let ((active-events (logior events (or active-events 0))))
  305. (set-car! fd-waiters active-events)
  306. (epoll-add*! (scheduler-epfd sched) fd
  307. (logior active-events EPOLLONESHOT)))))
  308. (#f
  309. (let ((fd-waiters (list events (cons events task))))
  310. (define (finalize-fd fd)
  311. ;; When a file port is closed, clear out the list of
  312. ;; waiting tasks so that when/if this FD is re-used, we
  313. ;; don't resume stale tasks. Note that we don't need to
  314. ;; remove the FD from the epoll set, as the kernel manages
  315. ;; that for us.
  316. ;;
  317. ;; FIXME: Is there a way to wake all tasks in a thread-safe
  318. ;; way? Note that this function may be invoked from a
  319. ;; finalizer thread.
  320. (set-cdr! fd-waiters '())
  321. (set-car! fd-waiters #f))
  322. (hashv-set! (scheduler-fd-waiters sched) fd fd-waiters)
  323. (add-fdes-finalizer! fd finalize-fd)
  324. (epoll-add*! (scheduler-epfd sched) fd
  325. (logior events EPOLLONESHOT)))))))
  326. (define (schedule-task-when-fd-readable sched fd task)
  327. "Arrange to schedule @var{task} on @var{sched} when the file
  328. descriptor @var{fd} becomes readable."
  329. (schedule-task-when-fd-active sched fd (logior EPOLLIN EPOLLRDHUP) task))
  330. (define (schedule-task-when-fd-writable sched fd task)
  331. "Arrange to schedule @var{k} on @var{sched} when the file descriptor
  332. @var{fd} becomes writable."
  333. (schedule-task-when-fd-active sched fd EPOLLOUT task))
  334. (define (schedule-task-at-time sched expiry task)
  335. "Arrange to schedule @var{task} when the absolute real time is
  336. greater than or equal to @var{expiry}, expressed in internal time
  337. units."
  338. (set-scheduler-timers! sched
  339. (psq-set (scheduler-timers sched)
  340. (cons expiry task)
  341. expiry)))
  342. ;; Shim for Guile 2.1.5.
  343. (unless (defined? 'suspendable-continuation?)
  344. (define! 'suspendable-continuation? (lambda (tag) #t)))
  345. (define* (suspend-current-task after-suspend)
  346. "Suspend the current task. After suspending, call the
  347. @var{after-suspend} callback with two arguments: the current
  348. scheduler, and the continuation of the current task. The continuation
  349. passed to the @var{after-suspend} handler is the continuation of the
  350. @code{suspend-current-task} call."
  351. (let ((tag (scheduler-prompt-tag (current-scheduler))))
  352. (unless (suspendable-continuation? tag)
  353. (error "Attempt to suspend fiber within continuation barrier"))
  354. (abort-to-prompt tag after-suspend)))
  355. (define* (yield-current-task)
  356. "Yield control to the current scheduler. Like calling
  357. @code{(suspend-current-task schedule-task)} except that it avoids
  358. suspending if the current continuation isn't suspendable. Returns
  359. @code{#t} if the yield succeeded, or @code{#f} otherwise."
  360. (match (current-scheduler)
  361. (#f #f)
  362. (sched
  363. (let ((tag (scheduler-prompt-tag sched)))
  364. (and (suspendable-continuation? tag)
  365. (begin
  366. (abort-to-prompt tag schedule-task)
  367. #t))))))