pool.scm 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. ;; CHANNELS
  2. ;; channel-receive: The channel, which is used by the work-distributor to
  3. ;; receive messages from the outer context. This channel is created in the
  4. ;; pool-initializer and given to all workers, so that they can report themselves
  5. ;; as ready to receive more work.
  6. ;; channel-return: This channel is created by the publish procedure and given to
  7. ;; work-distributor, which gives it to the workers, so that they can send
  8. ;; results of completed work as messages on this channel to the outer context.
  9. (define-module (fibers-pool))
  10. (use-modules
  11. ;; FIFO queue, not functional, using mutation
  12. ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
  13. (ice-9 q)
  14. (ice-9 match)
  15. (ice-9 threads)
  16. (rnrs exceptions)
  17. (rnrs conditions)
  18. ;; fibers internals are needed for creating schedulers without running anything
  19. ;; in them immediately
  20. (fibers)
  21. (fibers channels)
  22. (fibers internal))
  23. (define displayln
  24. (lambda (msg)
  25. (display msg)
  26. (newline)))
  27. (define work-distributor
  28. (lambda (channel-receive)
  29. ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started")
  30. ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop")
  31. (let loop ([work-queue (make-q)]
  32. [worker-channels-queue (make-q)])
  33. (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")
  34. (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ")
  35. (displayln (q-length worker-channels-queue))
  36. (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
  37. (displayln (q-length work-queue))
  38. (match (pk 'work-distributor-received-msg (get-message channel-receive))
  39. [('worker-ready . channel-worker)
  40. (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel")
  41. ;; If there is no work for the ready worker, enqueue the worker,
  42. ;; otherwise give it work.
  43. (cond
  44. [(q-empty? work-queue)
  45. ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty")
  46. (enq! worker-channels-queue channel-worker)]
  47. [else
  48. ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work")
  49. (let ([some-work (deq! work-queue)])
  50. ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel")
  51. (put-message channel-worker (cons 'work some-work))
  52. ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel")
  53. )])
  54. (loop work-queue worker-channels-queue)]
  55. [('work . work)
  56. ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work")
  57. ;; ~work~ is always a pair of a thunk to be run and a return channel,
  58. ;; on which the result shall be put.
  59. ;; If there is no worker ready, enqueue the work, otherwise distribute
  60. ;; the work to a ready worker.
  61. (cond
  62. [(q-empty? worker-channels-queue)
  63. ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty")
  64. (enq! work-queue work)]
  65. [else
  66. ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available")
  67. (let ([channel-worker (deq! worker-channels-queue)])
  68. ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel")
  69. (put-message channel-worker (cons 'work work))
  70. ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel")
  71. )])
  72. (loop work-queue worker-channels-queue)]
  73. ;; On any other message raise a condition.
  74. [other
  75. (raise
  76. (condition
  77. (make-error)
  78. (make-message-condition "work-distributor received unrecognized message")
  79. (make-irritants-condition (list other))))]))))
  80. (define worker
  81. (lambda (worker-index channel-receive)
  82. (let ([channel-worker (make-channel)])
  83. (displayln "[WORKER]: before worker message loop")
  84. (let loop ()
  85. ;; Report as ready. Give my own channel to the work-distributor to let
  86. ;; it send me work.
  87. (put-message channel-receive
  88. (cons 'worker-ready
  89. channel-worker))
  90. ;; Get messages sent to me by the distributor on my own channel.
  91. (match (pk 'worker-got-msg (get-message channel-worker))
  92. ;; If I receive work, do the work and return it on the channel-return.
  93. [('work . (thunk . channel-return))
  94. ;; Put the result on the return channel, so that anyone, who has the
  95. ;; a binding of the return channel, can access the result.
  96. (put-message channel-return (thunk))
  97. (loop)]
  98. ;; On any other message raise a condition.
  99. [other
  100. (raise
  101. (condition
  102. (make-error)
  103. (make-message-condition "worker received unrecognized message")
  104. (make-irritants-condition (list other))))])))))
  105. (define pool-initializer
  106. (lambda* (#:key (parallelism (current-processor-count)))
  107. ;; (define run-fibers-in-scheduler
  108. ;; (displayln "[POOL INIT]: runnning pool-initializer")
  109. (let ([channel-receive (make-channel)]
  110. [scheduler (make-scheduler #:parallelism parallelism)])
  111. ;; start as many workers as are desired
  112. ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
  113. ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
  114. ;; waiting for them to finish?
  115. ;; (displayln "[POOL INIT]: will run-fibers with new thread")
  116. (call-with-new-thread
  117. (lambda ()
  118. ;; (displayln "[POOL INIT THREAD]: running")
  119. (run-fibers
  120. (lambda ()
  121. ;; (displayln "[POOL INIT THREAD]: will start some fibers")
  122. ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism)
  123. (let loop ([index parallelism])
  124. (unless (zero? index)
  125. ;; using fibers:
  126. ;; TODO: use created scheduler
  127. ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn")
  128. (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
  129. (spawn-fiber (lambda () (worker index channel-receive)))
  130. ;; We do not need to spawn new fibers in the same scheduler later. The
  131. ;; fibers should stay alive for the whole duration the program is
  132. ;; running.
  133. (displayln "[POOL INIT THREAD]: fiber spawned")
  134. (loop (- index 1)))))
  135. #:scheduler scheduler)
  136. (displayln "[POOL INIT]: pool init thread returning")
  137. ))
  138. (displayln "[POOL INIT]: will start work-distributor")
  139. (call-with-new-thread
  140. (lambda ()
  141. (work-distributor channel-receive)))
  142. ;; (displayln "[POOL INIT]: work-distributor is now running in new thread")
  143. ;; Return the channel for receiving work, so that the outside context can
  144. ;; make use of it when calling ~publish~ to publish work.
  145. ;; (displayln "[POOL INIT]: returning channel-receive")
  146. channel-receive)))
  147. (define publish
  148. (lambda (work-as-thunk channel-receive)
  149. ;; The result of the computation can be taken from ~channel-return~.
  150. (let ([channel-return (make-channel)])
  151. ;; Put work tagged as work on the receive channel of the work-distributor.
  152. (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
  153. (display
  154. (simple-format
  155. #f "[PUBLISHER]: will publish the following work: ~a\n"
  156. work-message))
  157. (put-message channel-receive work-message))
  158. (displayln "[PUBLISHER]: work published")
  159. ;; Return the ~channel-return~, so that the outside context can get
  160. ;; results from it.
  161. channel-return)))
  162. (define busy-work
  163. (lambda ()
  164. (let loop ([i 0])
  165. (cond
  166. [(< i 5e8) (loop (+ i 1))]
  167. [else i]))))
  168. (define c-rec (pool-initializer #:parallelism 2))
  169. (define c-ret-2 (publish (lambda () (busy-work)) c-rec))
  170. (define c-ret-1 (publish (lambda () (busy-work)) c-rec))
  171. (get-message c-ret-2)
  172. (get-message c-ret-1)