pool-amirouche.scm 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. ;; Original work by Amirouche.
  2. ;; Some of the comments or docstrings by Zelphir.
  3. ;; pool of workers that can be used to execute blocking operation in a
  4. ;; fibers application.
  5. ;;
  6. ;; TODO: maybe it will be better to re-base this module on `future` or
  7. ;; `promise` object.
  8. (define-module (babelia pool))
  9. (import (ice-9 match))
  10. (import (ice-9 q))
  11. (import (ice-9 threads))
  12. (import (srfi srfi-9))
  13. (import (srfi srfi-1))
  14. (import (fibers))
  15. (import (fibers channels))
  16. (import (fibers operations))
  17. (import (babelia thread))
  18. (import (babelia okvs ulid))
  19. (import (babelia log))
  20. (define %channel #f)
  21. (define worker-count (- (current-processor-count) 1))
  22. (define (worker channel)
  23. (parameterize ((thread-index (random-bytes 2)))
  24. (let ((worker (make-channel)))
  25. (let loop ()
  26. (put-message channel (cons 'worker worker))
  27. (let* ((work (get-message worker))
  28. (thunk (car work))
  29. (return (cdr work))
  30. ;; Execute thunk and send the returned value. XXX: To be able
  31. ;; to keep track of jobs, the channel called `return`, is put
  32. ;; in itself. See procedure pool-for-each-par-map.
  33. ;; TODO: add a call-with-values
  34. (out (thunk)))
  35. (put-message return (cons return out)))
  36. (loop)))))
  37. (define (arbiter channel)
  38. "The arbiter is actually a loop, which looks on a given channel for
  39. messages. If the message is a worker, that means, that the worker is
  40. ready to receive more work to perform. If the message is work, it
  41. means, that this is work to be distributed to the workers. If a worker
  42. is ready and there is work, work is given to the worker. Workers and
  43. work are managed in queues. The queues are modified with mutating
  44. operations. Workers report as ready by putting a worker on the
  45. distribution channel."
  46. (let ((works (make-q))
  47. (workers (make-q)))
  48. ;; Look for messages on the given channel.
  49. (let loop ((message (get-message channel)))
  50. ;; Match messages.
  51. (match message
  52. ;; If the message is a worker, try to find work for it and
  53. ;; give that work to the worker, to get it done.
  54. (('worker . worker)
  55. (if (q-empty? works)
  56. (enq! workers worker)
  57. ;; If work can be given to the worker, take the work out
  58. ;; of the queue (it shall only be done by one worker) and
  59. ;; give it to the worker.
  60. (let ((work (deq! works)))
  61. (put-message worker work))))
  62. ;; If the message is work, try to find a worker, which is
  63. ;; ready and give that worker the work to get it done.
  64. (('work . work)
  65. (if (q-empty? workers)
  66. (enq! works work)
  67. ;; If a worker is ready to receive work, take it out of
  68. ;; the queue (it will be not ready any longer) and give
  69. ;; it work.
  70. (let ((worker (deq! workers)))
  71. (put-message worker work))))
  72. ;; Unrecognized message? Raise an exception.
  73. (_ (raise (cons 'fuu message))))
  74. ;; Look for more messages.
  75. (loop (get-message channel)))))
  76. (define-public (pool-init)
  77. (if %channel
  78. (error 'babelia "pool can not be initialized more than once")
  79. (let ((channel (make-channel)))
  80. (log-debug "pool init")
  81. (set! %channel channel)
  82. (let loop ((index worker-count))
  83. (unless (zero? index)
  84. (call-with-new-thread (lambda () (worker channel)))
  85. (loop (- index 1))))
  86. (arbiter channel))))
  87. (define (publish thunk)
  88. "Create an answer channel, on which results will be returned. Put a
  89. message on the global work distribution channel: tag: 'work, thunk:
  90. job to perform, channel: channel on which to reply to the message."
  91. (let ((return (make-channel)))
  92. (put-message %channel (cons 'work (cons thunk return)))
  93. return))
  94. (define-public (pool-apply thunk)
  95. "Execute THUNK in a worker thread.
  96. Pause the calling fiber until the result is available."
  97. (cdr (get-message (publish thunk))))
  98. (define (select channels)
  99. (perform-operation
  100. (apply choice-operation (map get-operation channels))))
  101. ;; TODO: Maybe add a timeout argument, in order to be able to display
  102. ;; a nicer error.
  103. (define-public (pool-for-each-par-map sproc pproc lst)
  104. "For each item of LST execute (PPROC item) in a worker thread, and
  105. gather returned value with SPROC. SPROC is executed in the calling
  106. fiber.
  107. This a POSIX thread pool based n-for-each-par-map for fibers. It is
  108. somewhat equivalent to:
  109. (for-each SSPROC (map PPROC LST))
  110. But applications of PPROC happens in parallel and waiting for new
  111. values is not blocking the main thread."
  112. ;; Look for responses in a loop to get all responses.
  113. (let loop
  114. ;; Get the channels, on which responses to messages will arrive.
  115. ((channels
  116. ;; Items are probably work to be done, wrapped as thunks.
  117. (map (lambda (item)
  118. (publish (lambda ()
  119. ;; pproc is the procedure, which performs
  120. ;; the work.
  121. (pproc item))))
  122. ;; lst contains the work to be done.
  123. lst)))
  124. ;; Only do anything, if there are any channels to retrieve
  125. ;; responses from.
  126. (unless (null? channels)
  127. ;; Select one channel and get the response (result of work) from
  128. ;; it.
  129. (match (select channels)
  130. ((channel . value)
  131. (sproc value)
  132. (loop (remove (lambda (x) (eq? x channel)) channels)))
  133. (else (raise 'fuuubar))))))