pool.scm 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. ;; CHANNELS
  2. ;; channel-receive: The channel, which is used by the work-distributor to
  3. ;; receive messages from the outer context. This channel is created in the
  4. ;; pool-initializer and given to all workers, so that they can report themselves
  5. ;; as ready to receive more work.
  6. ;; channel-return: This channel is created by the publish procedure and given to
  7. ;; work-distributor, which gives it to the workers, so that they can send
  8. ;; results of completed work as messages on this channel to the outer context.
  9. (define-module (fibers-pool))
  10. (use-modules
  11. ;; FIFO queue, not functional, using mutation
  12. ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
  13. (ice-9 q)
  14. (ice-9 match)
  15. (ice-9 threads)
  16. (rnrs exceptions)
  17. (rnrs conditions)
  18. ;; fibers internals are needed for creating schedulers without running anything
  19. ;; in them immediately
  20. (fibers)
  21. (fibers channels)
  22. (fibers internal))
  23. (define displayln
  24. (lambda (msg)
  25. (display msg)
  26. (newline)))
  27. (define run-worker-with-work
  28. (lambda (scheduler work-thunk-and-channel-return)
  29. ;; TODO: Somehow asynchronously start fibers as workers, avoiding the
  30. ;; overhead of one thread per worker, if possible.
  31. (call-with-new-thread
  32. (lambda ()
  33. (run-fibers
  34. (lambda ()
  35. (spawn-fiber (lambda () (worker index channel-receive))))
  36. #:scheduler scheduler)))))
  37. (define work-distributor
  38. (lambda* (scheduler channel-receive #:key (max-busy-workers-count +inf.0))
  39. (let loop ([busy-workers-count 0]
  40. [work-queue (make-q)])
  41. (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")
  42. (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
  43. (displayln (q-length work-queue))
  44. ;; Get a message form the channel-receive and match it against expected
  45. ;; cases: (1) new work to be done (2) worker finished.
  46. (match (pk 'work-distributor-received-msg (get-message channel-receive))
  47. ;; A worker has finished its work. If there is more work in the work
  48. ;; queue, start a new worker to work on that work, otherwise loop again
  49. ;; with reduced busy workers count for the finished worker, to read the
  50. ;; next message, if there is any.
  51. [('worker-finished)
  52. (cond
  53. [(q-empty? work-queue)
  54. (loop (- busy-workers-count 1) work-queue)]
  55. [else
  56. ;; Invariant: When a worker finishes, the maximum of simultaneously
  57. ;; busy workers cannot be reached, because the worker was one of
  58. ;; the workers, which were started when the number of
  59. ;; simultaneously busy workers was lower than the maximum,
  60. ;; otherwise the worker should never have been started.
  61. (run-worker-with-work scheduler (deq! work-queue))
  62. (loop (+ busy-workers-count 1) work-queue)])]
  63. ;; New work has been received. We check, whether the busy worker count
  64. ;; allows us to run more workers. If the maximum of simultaneously busy
  65. ;; workers is not yet reached, we start a new worker, which will be
  66. ;; given the received work. If the maximum of simultaneously busy
  67. ;; workers is reached, we put the received work in a queue to be worked
  68. ;; on later. Received work consists of a thunk to be run in a worker and
  69. ;; a channel, on which the worker shall report the result.
  70. [('work . work-thunk-and-channel-return)
  71. (cond
  72. [(< busy-workers-count max-busy-workers-count)
  73. (run-worker-with-work scheduler work-thunk-and-channel-return)
  74. (loop (+ busy-workers-count 1) work-queue)]
  75. [else
  76. (enq! work-queue work-thunk-and-channel-return)
  77. (loop busy-workers-count work-queue)])]
  78. ;; On any other message raise a condition. Do not loop, instead return.
  79. [other
  80. (raise
  81. (condition
  82. (make-error)
  83. (make-message-condition "work-distributor received unrecognized message")
  84. (make-irritants-condition (list other))))]))))
  85. (define worker
  86. (lambda (worker-index channel-receive)
  87. (let ([channel-worker (make-channel)])
  88. (let loop ()
  89. ;; Report as ready. Give my own channel to the work-distributor to let
  90. ;; it send me work.
  91. (put-message channel-receive
  92. (cons 'worker-ready
  93. channel-worker))
  94. ;; Get messages sent to me by the work distributor on my own channel.
  95. (match (pk 'worker-got-msg (get-message channel-worker))
  96. ;; If I receive work, do the work and return it on the
  97. ;; channel-return. Afterwards report to the work-distributor that this
  98. ;; worker has finised its work on channel-receive. Do not loop, as
  99. ;; there will be a new worker created for each new work.
  100. [('work . (thunk . channel-return))
  101. (put-message channel-return (thunk))
  102. (put-message channel-receive 'worker-finished)]
  103. ;; On any other message raise a condition.
  104. [other
  105. (raise
  106. (condition
  107. (make-error)
  108. (make-message-condition "worker received unrecognized message")
  109. (make-irritants-condition (list other))))])))))
  110. (define pool-initializer
  111. (lambda* (#:key (parallelism (current-processor-count)))
  112. (let ([channel-receive (make-channel)]
  113. [scheduler (make-scheduler #:parallelism parallelism)])
  114. (displayln "[POOL INIT]: will start work-distributor")
  115. (call-with-new-thread
  116. (lambda ()
  117. (work-distributor scheduler channel-receive)))
  118. ;; Return the channel for receiving work, so that the outside context can
  119. ;; make use of it when calling ~publish~ to publish work.
  120. channel-receive)))
  121. (define publish
  122. (lambda (work-as-thunk channel-receive)
  123. ;; The result of the computation can be taken from ~channel-return~.
  124. (let ([channel-return (make-channel)])
  125. ;; Put work tagged as work on the receive channel of the work-distributor.
  126. (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
  127. (display
  128. (simple-format
  129. #f "[PUBLISHER]: will publish the following work: ~a\n"
  130. work-message))
  131. (put-message channel-receive work-message))
  132. (displayln "[PUBLISHER]: work published")
  133. ;; Return the ~channel-return~, so that the outside context can get
  134. ;; results from it.
  135. channel-return)))
  136. (define busy-work
  137. (lambda ()
  138. (let loop ([i 0])
  139. (cond
  140. [(< i 5e8) (loop (+ i 1))]
  141. [else i]))))
  142. (define c-rec (pool-initializer #:parallelism 2))
  143. (define c-ret-2 (publish (lambda () (busy-work)) c-rec))
  144. (define c-ret-1 (publish (lambda () (busy-work)) c-rec))
  145. (get-message c-ret-2)
  146. (get-message c-ret-1)