workers.scm 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. ;;; GNU Guix --- Functional package management for GNU
  2. ;;; Copyright © 2017 Ludovic Courtès <ludo@gnu.org>
  3. ;;;
  4. ;;; This file is part of GNU Guix.
  5. ;;;
  6. ;;; GNU Guix is free software; you can redistribute it and/or modify it
  7. ;;; under the terms of the GNU General Public License as published by
  8. ;;; the Free Software Foundation; either version 3 of the License, or (at
  9. ;;; your option) any later version.
  10. ;;;
  11. ;;; GNU Guix is distributed in the hope that it will be useful, but
  12. ;;; WITHOUT ANY WARRANTY; without even the implied warranty of
  13. ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. ;;; GNU General Public License for more details.
  15. ;;;
  16. ;;; You should have received a copy of the GNU General Public License
  17. ;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
  18. (define-module (guix workers)
  19. #:use-module (ice-9 threads)
  20. #:use-module (ice-9 match)
  21. #:use-module (ice-9 q)
  22. #:use-module (srfi srfi-1)
  23. #:use-module (srfi srfi-9)
  24. #:use-module (srfi srfi-26)
  25. #:use-module ((guix build syscalls) #:select (set-thread-name))
  26. #:export (pool?
  27. make-pool
  28. pool-enqueue!
  29. pool-idle?
  30. eventually))
  31. ;;; Commentary:
  32. ;;;
  33. ;;; This module implements "worker pools". Worker pools are the low-level
  34. ;;; mechanism that's behind futures: there's a fixed set of threads
  35. ;;; ("workers") that one can submit work to, and one of them will eventually
  36. ;;; pick the submitted tasks.
  37. ;;;
  38. ;;; Unlike futures, these worker pools are meant to be used for tasks that
  39. ;;; have a side-effect. Thus, we never "touch" a task that was submitted like
  40. ;;; we "touch" a future. Instead, we simply assume that the task will
  41. ;;; eventually complete.
  42. ;;;
  43. ;;; Code:
  44. (define-record-type <pool>
  45. (%make-pool queue mutex condvar workers busy)
  46. pool?
  47. (queue pool-queue)
  48. (mutex pool-mutex)
  49. (condvar pool-condition-variable)
  50. (workers pool-workers)
  51. (busy pool-busy))
  52. (define-syntax-rule (without-mutex mutex exp ...)
  53. (dynamic-wind
  54. (lambda ()
  55. (unlock-mutex mutex))
  56. (lambda ()
  57. exp ...)
  58. (lambda ()
  59. (lock-mutex mutex))))
  60. (define* (worker-thunk mutex condvar pop-queue
  61. #:key idle busy (thread-name "guix worker"))
  62. "Return the thunk executed by worker threads."
  63. (define (loop)
  64. (match (pop-queue)
  65. (#f ;empty queue
  66. (idle)
  67. (wait-condition-variable condvar mutex)
  68. (busy))
  69. ((? procedure? proc)
  70. ;; Release MUTEX while executing PROC.
  71. (without-mutex mutex
  72. (catch #t proc
  73. (const #f)
  74. (lambda (key . args)
  75. ;; XXX: In Guile 2.0 ports are not thread-safe, so this could
  76. ;; crash (Guile 2.2 is fine).
  77. (display-backtrace (make-stack #t) (current-error-port))
  78. (print-exception (current-error-port)
  79. (and=> (make-stack #t)
  80. (cut stack-ref <> 0))
  81. key args))))))
  82. (loop))
  83. (lambda ()
  84. (catch 'system-error
  85. (lambda ()
  86. (set-thread-name thread-name))
  87. (const #f))
  88. (with-mutex mutex
  89. (loop))))
  90. (define* (make-pool #:optional (count (current-processor-count))
  91. #:key (thread-name "guix worker"))
  92. "Return a pool of COUNT workers. Use THREAD-NAME as the name of these
  93. threads as reported by the operating system."
  94. (let* ((mutex (make-mutex))
  95. (condvar (make-condition-variable))
  96. (queue (make-q))
  97. (busy count)
  98. (procs (unfold (cut >= <> count)
  99. (lambda (n)
  100. (worker-thunk mutex condvar
  101. (lambda ()
  102. (and (not (q-empty? queue))
  103. (q-pop! queue)))
  104. #:busy (lambda ()
  105. (set! busy (+ 1 busy)))
  106. #:idle (lambda ()
  107. (set! busy (- busy 1)))
  108. #:thread-name thread-name))
  109. 1+
  110. 0))
  111. (threads (map (lambda (proc)
  112. (call-with-new-thread proc))
  113. procs)))
  114. (%make-pool queue mutex condvar threads (lambda () busy))))
  115. (define (pool-enqueue! pool thunk)
  116. "Enqueue THUNK for future execution by POOL."
  117. (with-mutex (pool-mutex pool)
  118. (enq! (pool-queue pool) thunk)
  119. (signal-condition-variable (pool-condition-variable pool))))
  120. (define (pool-idle? pool)
  121. "Return true if POOL doesn't have any task in its queue and all the workers
  122. are currently idle (i.e., waiting for a task)."
  123. (with-mutex (pool-mutex pool)
  124. (and (q-empty? (pool-queue pool))
  125. (zero? ((pool-busy pool))))))
  126. (define-syntax-rule (eventually pool exp ...)
  127. "Run EXP eventually on one of the workers of POOL."
  128. (pool-enqueue! pool (lambda () exp ...)))
  129. ;;; Local Variables:
  130. ;;; eval: (put 'without-mutex 'scheme-indent-function 1)
  131. ;;; End:
  132. ;;; workers.scm ends here