123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- ;; CHANNELS
- ;; channel-receive: The channel, which is used by the work-distributor to
- ;; receive messages from the outer context. This channel is created in the
- ;; pool-initializer and given to all workers, so that they can report themselves
- ;; as ready to receive more work.
- ;; channel-return: This channel is created by the publish procedure and given to
- ;; work-distributor, which gives it to the workers, so that they can send
- ;; results of completed work as messages on this channel to the outer context.
- (define-module (fibers-pool))
- (use-modules
- ;; FIFO queue, not functional, using mutation
- ;; https://www.gnu.org/software/guile/manual/html_node/Queues.html
- (ice-9 q)
- (ice-9 match)
- (ice-9 threads)
- (rnrs exceptions)
- (rnrs conditions)
- ;; fibers internals are needed for creating schedulers without running anything
- ;; in them immediately
- (fibers)
- (fibers channels)
- (fibers internal))
- (define displayln
- (lambda (msg)
- (display msg)
- (newline)))
- (define work-distributor
- (lambda (channel-receive)
- ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor started")
- ;; (displayln "[WORK-DISTRIBUTOR]: starting work-distributor message loop")
- (let loop ([work-queue (make-q)]
- [worker-channels-queue (make-q)])
- (displayln "[WORK-DISTRIBUTOR]: work-distributor is listening for messages")
- (display "[WORK-DISTRIBUTOR]: number of ready workers in queue: ")
- (displayln (q-length worker-channels-queue))
- (display "[WORK-DISTRIBUTOR]: number of works in queue: ")
- (displayln (q-length work-queue))
- (match (pk 'work-distributor-received-msg (get-message channel-receive))
- [('worker-ready . channel-worker)
- (displayln "[WORK-DISTRIBUTOR]: work-distributor received ready worker channel")
- ;; If there is no work for the ready worker, enqueue the worker,
- ;; otherwise give it work.
- (cond
- [(q-empty? work-queue)
- ;; (displayln "[WORK-DISTRIBUTOR]: work queue is empty")
- (enq! worker-channels-queue channel-worker)]
- [else
- ;; (displayln "[WORK-DISTRIBUTOR]: work queue has work")
- (let ([some-work (deq! work-queue)])
- ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor will put work on channel")
- (put-message channel-worker (cons 'work some-work))
- ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor did put work on channel")
- )])
- (loop work-queue worker-channels-queue)]
- [('work . work)
- ;; (displayln "[WORK-DISTRIBUTOR]: work-distributor received work")
- ;; ~work~ is always a pair of a thunk to be run and a return channel,
- ;; on which the result shall be put.
- ;; If there is no worker ready, enqueue the work, otherwise distribute
- ;; the work to a ready worker.
- (cond
- [(q-empty? worker-channels-queue)
- ;; (displayln "[WORK-DISTRIBUTOR]: worker queue is empty")
- (enq! work-queue work)]
- [else
- ;; (displayln "[WORK-DISTRIBUTOR]: ready workers available")
- (let ([channel-worker (deq! worker-channels-queue)])
- ;; (displayln "[WORK-DISTRIBUTOR]: will put work on channel")
- (put-message channel-worker (cons 'work work))
- ;; (displayln "[WORK-DISTRIBUTOR]: did put work on channel")
- )])
- (loop work-queue worker-channels-queue)]
- ;; On any other message raise a condition.
- [other
- (raise
- (condition
- (make-error)
- (make-message-condition "work-distributor received unrecognized message")
- (make-irritants-condition (list other))))]))))
- (define worker
- (lambda (worker-index channel-receive)
- (let ([channel-worker (make-channel)])
- (displayln "[WORKER]: before worker message loop")
- (let loop ()
- ;; Report as ready. Give my own channel to the work-distributor to let
- ;; it send me work.
- (put-message channel-receive
- (cons 'worker-ready
- channel-worker))
- ;; Get messages sent to me by the distributor on my own channel.
- (match (pk 'worker-got-msg (get-message channel-worker))
- ;; If I receive work, do the work and return it on the channel-return.
- [('work . (thunk . channel-return))
- ;; Put the result on the return channel, so that anyone, who has the
- ;; a binding of the return channel, can access the result.
- (put-message channel-return (thunk))
- (loop)]
- ;; On any other message raise a condition.
- [other
- (raise
- (condition
- (make-error)
- (make-message-condition "worker received unrecognized message")
- (make-irritants-condition (list other))))])))))
- (define pool-initializer
- (lambda* (#:key (parallelism (current-processor-count)))
- ;; (define run-fibers-in-scheduler
- ;; (displayln "[POOL INIT]: runnning pool-initializer")
- (let ([channel-receive (make-channel)]
- [scheduler (make-scheduler #:parallelism parallelism)])
- ;; start as many workers as are desired
- ;; TODO: PROBLEM: ~run-fibers~ blocks. So we need a new thread to run the
- ;; fibers in a non-blocking way. LOOKUP: How to start fibers without
- ;; waiting for them to finish?
- ;; (displayln "[POOL INIT]: will run-fibers with new thread")
- (call-with-new-thread
- (lambda ()
- ;; (displayln "[POOL INIT THREAD]: running")
- (run-fibers
- (lambda ()
- ;; (displayln "[POOL INIT THREAD]: will start some fibers")
- ;; (display "[POOL INIT THREAD]: parallelism is: ") (displayln parallelism)
- (let loop ([index parallelism])
- (unless (zero? index)
- ;; using fibers:
- ;; TODO: use created scheduler
- ;; (displayln "[POOL INIT THREAD]: there are more fibers to spawn")
- (display "[POOL INIT THREAD]: will spawn fiber ") (displayln index)
- (spawn-fiber (lambda () (worker index channel-receive)))
- ;; We do not need to spawn new fibers in the same scheduler later. The
- ;; fibers should stay alive for the whole duration the program is
- ;; running.
- (displayln "[POOL INIT THREAD]: fiber spawned")
- (loop (- index 1)))))
- #:scheduler scheduler)
- (displayln "[POOL INIT]: pool init thread returning")
- ))
- (displayln "[POOL INIT]: will start work-distributor")
- (call-with-new-thread
- (lambda ()
- (work-distributor channel-receive)))
- ;; (displayln "[POOL INIT]: work-distributor is now running in new thread")
- ;; Return the channel for receiving work, so that the outside context can
- ;; make use of it when calling ~publish~ to publish work.
- ;; (displayln "[POOL INIT]: returning channel-receive")
- channel-receive)))
- (define publish
- (lambda (work-as-thunk channel-receive)
- ;; The result of the computation can be taken from ~channel-return~.
- (let ([channel-return (make-channel)])
- ;; Put work tagged as work on the receive channel of the work-distributor.
- (let ([work-message (cons 'work (cons work-as-thunk channel-return))])
- (display
- (simple-format
- #f "[PUBLISHER]: will publish the following work: ~a\n"
- work-message))
- (put-message channel-receive work-message))
- (displayln "[PUBLISHER]: work published")
- ;; Return the ~channel-return~, so that the outside context can get
- ;; results from it.
- channel-return)))
- (define busy-work
- (lambda ()
- (let loop ([i 0])
- (cond
- [(< i 5e8) (loop (+ i 1))]
- [else i]))))
- (define c-rec (pool-initializer #:parallelism 2))
- (define c-ret-2 (publish (lambda () (busy-work)) c-rec))
- (define c-ret-1 (publish (lambda () (busy-work)) c-rec))
- (get-message c-ret-2)
- (get-message c-ret-1)
|