123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- ;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011,
- ;;;; 2012, 2018 Free Software Foundation, Inc.
- ;;;;
- ;;;; This library is free software; you can redistribute it and/or
- ;;;; modify it under the terms of the GNU Lesser General Public
- ;;;; License as published by the Free Software Foundation; either
- ;;;; version 3 of the License, or (at your option) any later version.
- ;;;;
- ;;;; This library is distributed in the hope that it will be useful,
- ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
- ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- ;;;; Lesser General Public License for more details.
- ;;;;
- ;;;; You should have received a copy of the GNU Lesser General Public
- ;;;; License along with this library; if not, write to the Free Software
- ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- ;;;;
- ;;;; ----------------------------------------------------------------
- ;;;; threads.scm -- User-level interface to Guile's thread system
- ;;;; 4 March 1996, Anthony Green <green@cygnus.com>
- ;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se>
- ;;;; Modified 6 April 2001, ttn
- ;;;; ----------------------------------------------------------------
- ;;;;
- ;;; Commentary:
- ;; This module is documented in the Guile Reference Manual.
- ;;; Code:
- (define-module (ice-9 threads)
- #:use-module (ice-9 match)
- #:use-module (ice-9 control)
- ;; These bindings are marked as #:replace because when deprecated code
- ;; is enabled, (ice-9 deprecated) also exports these names.
- ;; (Referencing one of the deprecated names prints a warning directing
- ;; the user to these bindings.) Anyway once we can remove the
- ;; deprecated bindings, we should use #:export instead of #:replace
- ;; for these.
- #:replace (call-with-new-thread
- yield
- cancel-thread
- join-thread
- thread?
- make-mutex
- make-recursive-mutex
- lock-mutex
- try-mutex
- unlock-mutex
- mutex?
- mutex-owner
- mutex-level
- mutex-locked?
- make-condition-variable
- wait-condition-variable
- signal-condition-variable
- broadcast-condition-variable
- condition-variable?
- current-thread
- all-threads
- thread-exited?
- total-processor-count
- current-processor-count)
- #:export (begin-thread
- make-thread
- with-mutex
- monitor
- parallel
- letpar
- par-map
- par-for-each
- n-par-map
- n-par-for-each
- n-for-each-par-map
- %thread-handler))
- ;; Note that this extension also defines %make-transcoded-port, which is
- ;; not exported but is used by (rnrs io ports).
- (eval-when (expand eval load)
- (load-extension (string-append "libguile-" (effective-version))
- "scm_init_ice_9_threads"))
- (define-syntax-rule (with-mutex m e0 e1 ...)
- (let ((x m))
- (dynamic-wind
- (lambda () (lock-mutex x))
- (lambda () (begin e0 e1 ...))
- (lambda () (unlock-mutex x)))))
- (define cancel-tag (make-prompt-tag "cancel"))
- (define (cancel-thread thread . values)
- "Asynchronously interrupt the target @var{thread} and ask it to
- terminate, returning the given @var{values}. @code{dynamic-wind} post
- thunks will run, but throw handlers will not. If @var{thread} has
- already terminated or been signaled to terminate, this function is a
- no-op."
- (system-async-mark
- (lambda ()
- (catch #t
- (lambda ()
- (apply abort-to-prompt cancel-tag values))
- (lambda _
- (error "thread cancellation failed, throwing error instead???"))))
- thread))
- (define thread-join-data (make-object-property))
- (define %thread-results (make-object-property))
- (define* (call-with-new-thread thunk #:optional handler)
- "Call @code{thunk} in a new thread and with a new dynamic state,
- returning a new thread object representing the thread. The procedure
- @var{thunk} is called via @code{with-continuation-barrier}.
- When @var{handler} is specified, then @var{thunk} is called from within
- a @code{catch} with tag @code{#t} that has @var{handler} as its handler.
- This catch is established inside the continuation barrier.
- Once @var{thunk} or @var{handler} returns, the return value is made the
- @emph{exit value} of the thread and the thread is terminated."
- (let ((cv (make-condition-variable))
- (mutex (make-mutex))
- (thunk (if handler
- (lambda () (catch #t thunk handler))
- thunk))
- (thread #f))
- (define (call-with-backtrace thunk)
- (let ((err (current-error-port)))
- (catch #t
- (lambda () (%start-stack 'thread thunk))
- (lambda _ (values))
- (lambda (key . args)
- ;; Narrow by three: the dispatch-exception,
- ;; this thunk, and make-stack.
- (let ((stack (make-stack #t 3)))
- (false-if-exception
- (begin
- (when stack
- (display-backtrace stack err))
- (let ((frame (and stack (stack-ref stack 0))))
- (print-exception err frame key args)))))))))
- (with-mutex mutex
- (%call-with-new-thread
- (lambda ()
- (call-with-values
- (lambda ()
- (call-with-prompt cancel-tag
- (lambda ()
- (lock-mutex mutex)
- (set! thread (current-thread))
- (set! (thread-join-data thread) (cons cv mutex))
- (signal-condition-variable cv)
- (unlock-mutex mutex)
- (call-with-unblocked-asyncs
- (lambda () (call-with-backtrace thunk))))
- (lambda (k . args)
- (apply values args))))
- (lambda vals
- (lock-mutex mutex)
- ;; Probably now you're wondering why we are going to use
- ;; the cond variable as the key into the thread results
- ;; object property. It's because there is a possibility
- ;; that the thread object itself ends up as part of the
- ;; result, and if that happens we create a cycle whereby
- ;; the strong reference to a thread in the value of the
- ;; weak-key hash table used by the object property prevents
- ;; the thread from ever being collected. So instead we use
- ;; the cv as the key. Weak-key hash tables, amirite?
- (set! (%thread-results cv) vals)
- (broadcast-condition-variable cv)
- (unlock-mutex mutex)
- (apply values vals)))))
- (let lp ()
- (unless thread
- (wait-condition-variable cv mutex)
- (lp))))
- thread))
- (define* (join-thread thread #:optional timeout timeoutval)
- "Suspend execution of the calling thread until the target @var{thread}
- terminates, unless the target @var{thread} has already terminated."
- (match (thread-join-data thread)
- (#f (error "foreign thread cannot be joined" thread))
- ((cv . mutex)
- (lock-mutex mutex)
- (let lp ()
- (cond
- ((%thread-results cv)
- => (lambda (results)
- (unlock-mutex mutex)
- (apply values results)))
- ((if timeout
- (wait-condition-variable cv mutex timeout)
- (wait-condition-variable cv mutex))
- (lp))
- (else timeoutval))))))
- (define* (try-mutex mutex)
- "Try to lock @var{mutex}. If the mutex is already locked, return
- @code{#f}. Otherwise lock the mutex and return @code{#t}."
- (lock-mutex mutex 0))
- ;;; Macros first, so that the procedures expand correctly.
- (define-syntax-rule (begin-thread e0 e1 ...)
- (call-with-new-thread
- (lambda () e0 e1 ...)
- %thread-handler))
- (define-syntax-rule (make-thread proc arg ...)
- (call-with-new-thread
- (lambda () (proc arg ...))
- %thread-handler))
- (define monitor-mutex-table (make-hash-table))
- (define monitor-mutex-table-mutex (make-mutex))
- (define (monitor-mutex-with-id id)
- (with-mutex monitor-mutex-table-mutex
- (or (hashq-ref monitor-mutex-table id)
- (let ((mutex (make-mutex)))
- (hashq-set! monitor-mutex-table id mutex)
- mutex))))
- (define-syntax monitor
- (lambda (stx)
- (syntax-case stx ()
- ((_ body body* ...)
- (let ((id (datum->syntax #'body (gensym))))
- #`(with-mutex (monitor-mutex-with-id '#,id)
- body body* ...))))))
- (define (thread-handler tag . args)
- (let ((n (length args))
- (p (current-error-port)))
- (display "In thread:" p)
- (newline p)
- (if (>= n 3)
- (display-error #f
- p
- (car args)
- (cadr args)
- (caddr args)
- (if (= n 4)
- (cadddr args)
- '()))
- (begin
- (display "uncaught throw to " p)
- (display tag p)
- (display ": " p)
- (display args p)
- (newline p)))
- #f))
- ;;; Set system thread handler
- (define %thread-handler thread-handler)
- (use-modules (ice-9 futures))
- (define-syntax parallel
- (lambda (x)
- (syntax-case x ()
- ((_ e0 ...)
- (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
- #'(let ((tmp0 (future e0))
- ...)
- (values (touch tmp0) ...)))))))
- (define-syntax-rule (letpar ((v e) ...) b0 b1 ...)
- (call-with-values
- (lambda () (parallel e ...))
- (lambda (v ...)
- b0 b1 ...)))
- (define (par-mapper mapper cons)
- (lambda (proc . lists)
- (let loop ((lists lists))
- (match lists
- (((heads tails ...) ...)
- (let ((tail (future (loop tails)))
- (head (apply proc heads)))
- (cons head (touch tail))))
- (_
- '())))))
- (define par-map (par-mapper map cons))
- (define par-for-each (par-mapper for-each (const *unspecified*)))
- (define (n-par-map n proc . arglists)
- (let* ((m (make-mutex))
- (threads '())
- (results (make-list (length (car arglists))))
- (result results))
- (do ((i 0 (+ 1 i)))
- ((= i n)
- (for-each join-thread threads)
- results)
- (set! threads
- (cons (begin-thread
- (let loop ()
- (lock-mutex m)
- (if (null? result)
- (unlock-mutex m)
- (let ((args (map car arglists))
- (my-result result))
- (set! arglists (map cdr arglists))
- (set! result (cdr result))
- (unlock-mutex m)
- (set-car! my-result (apply proc args))
- (loop)))))
- threads)))))
- (define (n-par-for-each n proc . arglists)
- (let ((m (make-mutex))
- (threads '()))
- (do ((i 0 (+ 1 i)))
- ((= i n)
- (for-each join-thread threads))
- (set! threads
- (cons (begin-thread
- (let loop ()
- (lock-mutex m)
- (if (null? (car arglists))
- (unlock-mutex m)
- (let ((args (map car arglists)))
- (set! arglists (map cdr arglists))
- (unlock-mutex m)
- (apply proc args)
- (loop)))))
- threads)))))
- ;;; The following procedure is motivated by the common and important
- ;;; case where a lot of work should be done, (not too much) in parallel,
- ;;; but the results need to be handled serially (for example when
- ;;; writing them to a file).
- ;;;
- (define (n-for-each-par-map n s-proc p-proc . arglists)
- "Using N parallel processes, apply S-PROC in serial order on the results
- of applying P-PROC on ARGLISTS."
- (let* ((m (make-mutex))
- (threads '())
- (no-result '(no-value))
- (results (make-list (length (car arglists)) no-result))
- (result results))
- (do ((i 0 (+ 1 i)))
- ((= i n)
- (for-each join-thread threads))
- (set! threads
- (cons (begin-thread
- (let loop ()
- (lock-mutex m)
- (cond ((null? results)
- (unlock-mutex m))
- ((not (eq? (car results) no-result))
- (let ((arg (car results)))
- ;; stop others from choosing to process results
- (set-car! results no-result)
- (unlock-mutex m)
- (s-proc arg)
- (lock-mutex m)
- (set! results (cdr results))
- (unlock-mutex m)
- (loop)))
- ((null? result)
- (unlock-mutex m))
- (else
- (let ((args (map car arglists))
- (my-result result))
- (set! arglists (map cdr arglists))
- (set! result (cdr result))
- (unlock-mutex m)
- (set-car! my-result (apply p-proc args))
- (loop))))))
- threads)))))
- ;; Now that thread support is loaded, make module autoloading
- ;; thread-safe.
- (set! (@ (guile) call-with-module-autoload-lock)
- (let ((mutex (make-mutex 'recursive)))
- (lambda (thunk)
- (with-mutex mutex
- (thunk)))))
- ;;; threads.scm ends here
|