123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- ;; Original work by Amirouche.
- ;; Some of the comments or docstrings by Zelphir.
- ;; pool of workers that can be used to execute blocking operation in a
- ;; fibers application.
- ;;
- ;; TODO: maybe it will be better to re-base this module on `future` or
- ;; `promise` object.
- (define-module (babelia pool))
- (import (ice-9 match))
- (import (ice-9 q))
- (import (ice-9 threads))
- (import (srfi srfi-9))
- (import (srfi srfi-1))
- (import (fibers))
- (import (fibers channels))
- (import (fibers operations))
- (import (babelia thread))
- (import (babelia okvs ulid))
- (import (babelia log))
- (define %channel #f)
- (define worker-count (- (current-processor-count) 1))
- (define (worker channel)
- (parameterize ((thread-index (random-bytes 2)))
- (let ((worker (make-channel)))
- (let loop ()
- (put-message channel (cons 'worker worker))
- (let* ((work (get-message worker))
- (thunk (car work))
- (return (cdr work))
- ;; Execute thunk and send the returned value. XXX: To be able
- ;; to keep track of jobs, the channel called `return`, is put
- ;; in itself. See procedure pool-for-each-par-map.
- ;; TODO: add a call-with-values
- (out (thunk)))
- (put-message return (cons return out)))
- (loop)))))
- (define (arbiter channel)
- "The arbiter is actually a loop, which looks on a given channel for
- messages. If the message is a worker, that means, that the worker is
- ready to receive more work to perform. If the message is work, it
- means, that this is work to be distributed to the workers. If a worker
- is ready and there is work, work is given to the worker. Workers and
- work are managed in queues. The queues are modified with mutating
- operations. Workers report as ready by putting a worker on the
- distribution channel."
- (let ((works (make-q))
- (workers (make-q)))
- ;; Look for messages on the given channel.
- (let loop ((message (get-message channel)))
- ;; Match messages.
- (match message
- ;; If the message is a worker, try to find work for it and
- ;; give that work to the worker, to get it done.
- (('worker . worker)
- (if (q-empty? works)
- (enq! workers worker)
- ;; If work can be given to the worker, take the work out
- ;; of the queue (it shall only be done by one worker) and
- ;; give it to the worker.
- (let ((work (deq! works)))
- (put-message worker work))))
- ;; If the message is work, try to find a worker, which is
- ;; ready and give that worker the work to get it done.
- (('work . work)
- (if (q-empty? workers)
- (enq! works work)
- ;; If a worker is ready to receive work, take it out of
- ;; the queue (it will be not ready any longer) and give
- ;; it work.
- (let ((worker (deq! workers)))
- (put-message worker work))))
- ;; Unrecognized message? Raise an exception.
- (_ (raise (cons 'fuu message))))
- ;; Look for more messages.
- (loop (get-message channel)))))
- (define-public (pool-init)
- (if %channel
- (error 'babelia "pool can not be initialized more than once")
- (let ((channel (make-channel)))
- (log-debug "pool init")
- (set! %channel channel)
- (let loop ((index worker-count))
- (unless (zero? index)
- (call-with-new-thread (lambda () (worker channel)))
- (loop (- index 1))))
- (arbiter channel))))
- (define (publish thunk)
- "Create an answer channel, on which results will be returned. Put a
- message on the global work distribution channel: tag: 'work, thunk:
- job to perform, channel: channel on which to reply to the message."
- (let ((return (make-channel)))
- (put-message %channel (cons 'work (cons thunk return)))
- return))
- (define-public (pool-apply thunk)
- "Execute THUNK in a worker thread.
- Pause the calling fiber until the result is available."
- (cdr (get-message (publish thunk))))
- (define (select channels)
- (perform-operation
- (apply choice-operation (map get-operation channels))))
- ;; TODO: Maybe add a timeout argument, in order to be able to display
- ;; a nicer error.
- (define-public (pool-for-each-par-map sproc pproc lst)
- "For each item of LST execute (PPROC item) in a worker thread, and
- gather returned value with SPROC. SPROC is executed in the calling
- fiber.
- This a POSIX thread pool based n-for-each-par-map for fibers. It is
- somewhat equivalent to:
- (for-each SSPROC (map PPROC LST))
- But applications of PPROC happens in parallel and waiting for new
- values is not blocking the main thread."
- ;; Look for responses in a loop to get all responses.
- (let loop
- ;; Get the channels, on which responses to messages will arrive.
- ((channels
- ;; Items are probably work to be done, wrapped as thunks.
- (map (lambda (item)
- (publish (lambda ()
- ;; pproc is the procedure, which performs
- ;; the work.
- (pproc item))))
- ;; lst contains the work to be done.
- lst)))
- ;; Only do anything, if there are any channels to retrieve
- ;; responses from.
- (unless (null? channels)
- ;; Select one channel and get the response (result of work) from
- ;; it.
- (match (select channels)
- ((channel . value)
- (sproc value)
- (loop (remove (lambda (x) (eq? x channel)) channels)))
- (else (raise 'fuuubar))))))
|