threads.scm 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. ;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011,
  2. ;;;; 2012, 2018 Free Software Foundation, Inc.
  3. ;;;;
  4. ;;;; This library is free software; you can redistribute it and/or
  5. ;;;; modify it under the terms of the GNU Lesser General Public
  6. ;;;; License as published by the Free Software Foundation; either
  7. ;;;; version 3 of the License, or (at your option) any later version.
  8. ;;;;
  9. ;;;; This library is distributed in the hope that it will be useful,
  10. ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. ;;;; Lesser General Public License for more details.
  13. ;;;;
  14. ;;;; You should have received a copy of the GNU Lesser General Public
  15. ;;;; License along with this library; if not, write to the Free Software
  16. ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. ;;;;
  18. ;;;; ----------------------------------------------------------------
  19. ;;;; threads.scm -- User-level interface to Guile's thread system
  20. ;;;; 4 March 1996, Anthony Green <green@cygnus.com>
  21. ;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se>
  22. ;;;; Modified 6 April 2001, ttn
  23. ;;;; ----------------------------------------------------------------
  24. ;;;;
  25. ;;; Commentary:
  26. ;; This module is documented in the Guile Reference Manual.
  27. ;;; Code:
  28. (define-module (ice-9 threads)
  29. #:use-module (ice-9 match)
  30. #:use-module (ice-9 control)
  31. ;; These bindings are marked as #:replace because when deprecated code
  32. ;; is enabled, (ice-9 deprecated) also exports these names.
  33. ;; (Referencing one of the deprecated names prints a warning directing
  34. ;; the user to these bindings.) Anyway once we can remove the
  35. ;; deprecated bindings, we should use #:export instead of #:replace
  36. ;; for these.
  37. #:replace (call-with-new-thread
  38. yield
  39. cancel-thread
  40. join-thread
  41. thread?
  42. make-mutex
  43. make-recursive-mutex
  44. lock-mutex
  45. try-mutex
  46. unlock-mutex
  47. mutex?
  48. mutex-owner
  49. mutex-level
  50. mutex-locked?
  51. make-condition-variable
  52. wait-condition-variable
  53. signal-condition-variable
  54. broadcast-condition-variable
  55. condition-variable?
  56. current-thread
  57. all-threads
  58. thread-exited?
  59. total-processor-count
  60. current-processor-count)
  61. #:export (begin-thread
  62. make-thread
  63. with-mutex
  64. monitor
  65. parallel
  66. letpar
  67. par-map
  68. par-for-each
  69. n-par-map
  70. n-par-for-each
  71. n-for-each-par-map
  72. %thread-handler))
  73. ;; Note that this extension also defines %make-transcoded-port, which is
  74. ;; not exported but is used by (rnrs io ports).
  75. (eval-when (expand eval load)
  76. (load-extension (string-append "libguile-" (effective-version))
  77. "scm_init_ice_9_threads"))
  78. (define-syntax-rule (with-mutex m e0 e1 ...)
  79. (let ((x m))
  80. (dynamic-wind
  81. (lambda () (lock-mutex x))
  82. (lambda () (begin e0 e1 ...))
  83. (lambda () (unlock-mutex x)))))
  84. (define cancel-tag (make-prompt-tag "cancel"))
  85. (define (cancel-thread thread . values)
  86. "Asynchronously interrupt the target @var{thread} and ask it to
  87. terminate, returning the given @var{values}. @code{dynamic-wind} post
  88. thunks will run, but throw handlers will not. If @var{thread} has
  89. already terminated or been signaled to terminate, this function is a
  90. no-op."
  91. (system-async-mark
  92. (lambda ()
  93. (catch #t
  94. (lambda ()
  95. (apply abort-to-prompt cancel-tag values))
  96. (lambda _
  97. (error "thread cancellation failed, throwing error instead???"))))
  98. thread))
  99. (define thread-join-data (make-object-property))
  100. (define %thread-results (make-object-property))
  101. (define* (call-with-new-thread thunk #:optional handler)
  102. "Call @code{thunk} in a new thread and with a new dynamic state,
  103. returning a new thread object representing the thread. The procedure
  104. @var{thunk} is called via @code{with-continuation-barrier}.
  105. When @var{handler} is specified, then @var{thunk} is called from within
  106. a @code{catch} with tag @code{#t} that has @var{handler} as its handler.
  107. This catch is established inside the continuation barrier.
  108. Once @var{thunk} or @var{handler} returns, the return value is made the
  109. @emph{exit value} of the thread and the thread is terminated."
  110. (let ((cv (make-condition-variable))
  111. (mutex (make-mutex))
  112. (thunk (if handler
  113. (lambda () (catch #t thunk handler))
  114. thunk))
  115. (thread #f))
  116. (define (call-with-backtrace thunk)
  117. (let ((err (current-error-port)))
  118. (catch #t
  119. (lambda () (%start-stack 'thread thunk))
  120. (lambda _ (values))
  121. (lambda (key . args)
  122. ;; Narrow by three: the dispatch-exception,
  123. ;; this thunk, and make-stack.
  124. (let ((stack (make-stack #t 3)))
  125. (false-if-exception
  126. (begin
  127. (when stack
  128. (display-backtrace stack err))
  129. (let ((frame (and stack (stack-ref stack 0))))
  130. (print-exception err frame key args)))))))))
  131. (with-mutex mutex
  132. (%call-with-new-thread
  133. (lambda ()
  134. (call-with-values
  135. (lambda ()
  136. (call-with-prompt cancel-tag
  137. (lambda ()
  138. (lock-mutex mutex)
  139. (set! thread (current-thread))
  140. (set! (thread-join-data thread) (cons cv mutex))
  141. (signal-condition-variable cv)
  142. (unlock-mutex mutex)
  143. (call-with-unblocked-asyncs
  144. (lambda () (call-with-backtrace thunk))))
  145. (lambda (k . args)
  146. (apply values args))))
  147. (lambda vals
  148. (lock-mutex mutex)
  149. ;; Probably now you're wondering why we are going to use
  150. ;; the cond variable as the key into the thread results
  151. ;; object property. It's because there is a possibility
  152. ;; that the thread object itself ends up as part of the
  153. ;; result, and if that happens we create a cycle whereby
  154. ;; the strong reference to a thread in the value of the
  155. ;; weak-key hash table used by the object property prevents
  156. ;; the thread from ever being collected. So instead we use
  157. ;; the cv as the key. Weak-key hash tables, amirite?
  158. (set! (%thread-results cv) vals)
  159. (broadcast-condition-variable cv)
  160. (unlock-mutex mutex)
  161. (apply values vals)))))
  162. (let lp ()
  163. (unless thread
  164. (wait-condition-variable cv mutex)
  165. (lp))))
  166. thread))
  167. (define* (join-thread thread #:optional timeout timeoutval)
  168. "Suspend execution of the calling thread until the target @var{thread}
  169. terminates, unless the target @var{thread} has already terminated."
  170. (match (thread-join-data thread)
  171. (#f (error "foreign thread cannot be joined" thread))
  172. ((cv . mutex)
  173. (lock-mutex mutex)
  174. (let lp ()
  175. (cond
  176. ((%thread-results cv)
  177. => (lambda (results)
  178. (unlock-mutex mutex)
  179. (apply values results)))
  180. ((if timeout
  181. (wait-condition-variable cv mutex timeout)
  182. (wait-condition-variable cv mutex))
  183. (lp))
  184. (else timeoutval))))))
  185. (define* (try-mutex mutex)
  186. "Try to lock @var{mutex}. If the mutex is already locked, return
  187. @code{#f}. Otherwise lock the mutex and return @code{#t}."
  188. (lock-mutex mutex 0))
  189. ;;; Macros first, so that the procedures expand correctly.
  190. (define-syntax-rule (begin-thread e0 e1 ...)
  191. (call-with-new-thread
  192. (lambda () e0 e1 ...)
  193. %thread-handler))
  194. (define-syntax-rule (make-thread proc arg ...)
  195. (call-with-new-thread
  196. (lambda () (proc arg ...))
  197. %thread-handler))
  198. (define monitor-mutex-table (make-hash-table))
  199. (define monitor-mutex-table-mutex (make-mutex))
  200. (define (monitor-mutex-with-id id)
  201. (with-mutex monitor-mutex-table-mutex
  202. (or (hashq-ref monitor-mutex-table id)
  203. (let ((mutex (make-mutex)))
  204. (hashq-set! monitor-mutex-table id mutex)
  205. mutex))))
  206. (define-syntax monitor
  207. (lambda (stx)
  208. (syntax-case stx ()
  209. ((_ body body* ...)
  210. (let ((id (datum->syntax #'body (gensym))))
  211. #`(with-mutex (monitor-mutex-with-id '#,id)
  212. body body* ...))))))
  213. (define (thread-handler tag . args)
  214. (let ((n (length args))
  215. (p (current-error-port)))
  216. (display "In thread:" p)
  217. (newline p)
  218. (if (>= n 3)
  219. (display-error #f
  220. p
  221. (car args)
  222. (cadr args)
  223. (caddr args)
  224. (if (= n 4)
  225. (cadddr args)
  226. '()))
  227. (begin
  228. (display "uncaught throw to " p)
  229. (display tag p)
  230. (display ": " p)
  231. (display args p)
  232. (newline p)))
  233. #f))
  234. ;;; Set system thread handler
  235. (define %thread-handler thread-handler)
  236. (use-modules (ice-9 futures))
  237. (define-syntax parallel
  238. (lambda (x)
  239. (syntax-case x ()
  240. ((_ e0 ...)
  241. (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
  242. #'(let ((tmp0 (future e0))
  243. ...)
  244. (values (touch tmp0) ...)))))))
  245. (define-syntax-rule (letpar ((v e) ...) b0 b1 ...)
  246. (call-with-values
  247. (lambda () (parallel e ...))
  248. (lambda (v ...)
  249. b0 b1 ...)))
  250. (define (par-mapper mapper cons)
  251. (lambda (proc . lists)
  252. (let loop ((lists lists))
  253. (match lists
  254. (((heads tails ...) ...)
  255. (let ((tail (future (loop tails)))
  256. (head (apply proc heads)))
  257. (cons head (touch tail))))
  258. (_
  259. '())))))
  260. (define par-map (par-mapper map cons))
  261. (define par-for-each (par-mapper for-each (const *unspecified*)))
  262. (define (n-par-map n proc . arglists)
  263. (let* ((m (make-mutex))
  264. (threads '())
  265. (results (make-list (length (car arglists))))
  266. (result results))
  267. (do ((i 0 (+ 1 i)))
  268. ((= i n)
  269. (for-each join-thread threads)
  270. results)
  271. (set! threads
  272. (cons (begin-thread
  273. (let loop ()
  274. (lock-mutex m)
  275. (if (null? result)
  276. (unlock-mutex m)
  277. (let ((args (map car arglists))
  278. (my-result result))
  279. (set! arglists (map cdr arglists))
  280. (set! result (cdr result))
  281. (unlock-mutex m)
  282. (set-car! my-result (apply proc args))
  283. (loop)))))
  284. threads)))))
  285. (define (n-par-for-each n proc . arglists)
  286. (let ((m (make-mutex))
  287. (threads '()))
  288. (do ((i 0 (+ 1 i)))
  289. ((= i n)
  290. (for-each join-thread threads))
  291. (set! threads
  292. (cons (begin-thread
  293. (let loop ()
  294. (lock-mutex m)
  295. (if (null? (car arglists))
  296. (unlock-mutex m)
  297. (let ((args (map car arglists)))
  298. (set! arglists (map cdr arglists))
  299. (unlock-mutex m)
  300. (apply proc args)
  301. (loop)))))
  302. threads)))))
  303. ;;; The following procedure is motivated by the common and important
  304. ;;; case where a lot of work should be done, (not too much) in parallel,
  305. ;;; but the results need to be handled serially (for example when
  306. ;;; writing them to a file).
  307. ;;;
  308. (define (n-for-each-par-map n s-proc p-proc . arglists)
  309. "Using N parallel processes, apply S-PROC in serial order on the results
  310. of applying P-PROC on ARGLISTS."
  311. (let* ((m (make-mutex))
  312. (threads '())
  313. (no-result '(no-value))
  314. (results (make-list (length (car arglists)) no-result))
  315. (result results))
  316. (do ((i 0 (+ 1 i)))
  317. ((= i n)
  318. (for-each join-thread threads))
  319. (set! threads
  320. (cons (begin-thread
  321. (let loop ()
  322. (lock-mutex m)
  323. (cond ((null? results)
  324. (unlock-mutex m))
  325. ((not (eq? (car results) no-result))
  326. (let ((arg (car results)))
  327. ;; stop others from choosing to process results
  328. (set-car! results no-result)
  329. (unlock-mutex m)
  330. (s-proc arg)
  331. (lock-mutex m)
  332. (set! results (cdr results))
  333. (unlock-mutex m)
  334. (loop)))
  335. ((null? result)
  336. (unlock-mutex m))
  337. (else
  338. (let ((args (map car arglists))
  339. (my-result result))
  340. (set! arglists (map cdr arglists))
  341. (set! result (cdr result))
  342. (unlock-mutex m)
  343. (set-car! my-result (apply p-proc args))
  344. (loop))))))
  345. threads)))))
  346. ;; Now that thread support is loaded, make module autoloading
  347. ;; thread-safe.
  348. (set! (@ (guile) call-with-module-autoload-lock)
  349. (let ((mutex (make-mutex 'recursive)))
  350. (lambda (thunk)
  351. (with-mutex mutex
  352. (thunk)))))
  353. ;;; threads.scm ends here