channels.scm 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. ;; Channels
  2. ;;;; Copyright (C) 2016 Andy Wingo <wingo@pobox.com>
  3. ;;;; Copyright (C) 2017 Christopher Allan Webber <cwebber@dustycloud.org>
  4. ;;;;
  5. ;;;; This library is free software; you can redistribute it and/or
  6. ;;;; modify it under the terms of the GNU Lesser General Public
  7. ;;;; License as published by the Free Software Foundation; either
  8. ;;;; version 3 of the License, or (at your option) any later version.
  9. ;;;;
  10. ;;;; This library is distributed in the hope that it will be useful,
  11. ;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. ;;;; Lesser General Public License for more details.
  14. ;;;;
  15. ;;;; You should have received a copy of the GNU Lesser General Public
  16. ;;;; License along with this library; if not, write to the Free Software
  17. ;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. ;;; Channel implementation following the 2009 ICFP paper "Parallel
  19. ;;; Concurrent ML" by John Reppy, Claudio V. Russo, and Yingqui Xiao.
  20. ;;;
  21. ;;; Besides the general ways in which this implementation differs from
  22. ;;; the paper, this channel implementation avoids locks entirely.
  23. ;;; Still, we should disable interrupts while any operation is in a
  24. ;;; "claimed" state to avoid excess latency due to pre-emption. It
  25. ;;; would be great if we could verify our protocol though; the
  26. ;;; parallel channel operations are still gnarly.
  27. (define-module (fibers channels)
  28. #:use-module (srfi srfi-9)
  29. #:use-module (srfi srfi-9 gnu)
  30. #:use-module (ice-9 atomic)
  31. #:use-module (ice-9 match)
  32. #:use-module (fibers counter)
  33. #:use-module (fibers deque)
  34. #:use-module (fibers operations)
  35. #:export (make-channel
  36. channel?
  37. put-operation
  38. get-operation
  39. put-message
  40. get-message))
  41. (define-record-type <channel>
  42. (%make-channel getq getq-gc-counter putq putq-gc-counter)
  43. channel?
  44. ;; atomic box of deque
  45. (getq channel-getq)
  46. (getq-gc-counter channel-getq-gc-counter)
  47. ;; atomic box of deque
  48. (putq channel-putq)
  49. (putq-gc-counter channel-putq-gc-counter))
  50. (define (make-channel)
  51. "Make a fresh channel."
  52. (%make-channel (make-atomic-box (make-empty-deque))
  53. (make-counter)
  54. (make-atomic-box (make-empty-deque))
  55. (make-counter)))
  56. (define (put-operation channel message)
  57. "Make an operation that if and when it completes will rendezvous
  58. with a receiver fiber to send @var{message} over @var{channel}."
  59. (match channel
  60. (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
  61. (define (try-fn)
  62. ;; Try to find and perform a pending get operation. If that
  63. ;; works, return a result thunk, or otherwise #f.
  64. (let try ((getq (atomic-box-ref getq-box)))
  65. (call-with-values (lambda () (dequeue getq))
  66. (lambda (getq* item)
  67. (define (maybe-commit)
  68. ;; Try to update getq. Return the new getq value in
  69. ;; any case.
  70. (let ((q (atomic-box-compare-and-swap! getq-box getq getq*)))
  71. (if (eq? q getq) getq* q)))
  72. ;; Return #f if the getq was empty.
  73. (and getq*
  74. (match item
  75. (#(get-flag resume-get)
  76. (let spin ()
  77. (match (atomic-box-compare-and-swap! get-flag 'W 'S)
  78. ('W
  79. ;; Success. Commit the dequeue operation,
  80. ;; unless the getq changed in the
  81. ;; meantime. If we don't manage to commit
  82. ;; the dequeue, some other put operation will
  83. ;; commit it before it successfully
  84. ;; performs any other operation on this
  85. ;; channel.
  86. (maybe-commit)
  87. (resume-get (lambda () message))
  88. ;; Continue directly.
  89. (lambda () (values)))
  90. ;; Get operation temporarily busy; try again.
  91. ('C (spin))
  92. ;; Get operation already performed; pop it
  93. ;; off the getq (if we can) and try again.
  94. ;; If we fail to commit, no big deal, we will
  95. ;; try again next time if no other fiber
  96. ;; handled it already.
  97. ('S (try (maybe-commit))))))))))))
  98. (define (block-fn put-flag put-sched resume-put)
  99. ;; We have suspended the current fiber; arrange for the fiber
  100. ;; to be resumed by a get operation by adding it to the channel's
  101. ;; putq.
  102. (define (not-me? item)
  103. (match item
  104. (#(get-flag resume-get)
  105. (not (eq? put-flag get-flag)))))
  106. ;; First, publish this put operation.
  107. (enqueue! putq-box (vector put-flag resume-put message))
  108. ;; Next, possibly clear off any garbage from queue.
  109. (when (= (counter-decrement! putq-gc-counter) 0)
  110. (dequeue-filter! putq-box
  111. (match-lambda
  112. (#(flag resume message)
  113. (not (eq? (atomic-box-ref flag) 'S)))))
  114. (counter-reset! putq-gc-counter))
  115. ;; In the try phase, we scanned the getq for a get operation,
  116. ;; but we were unable to perform any of them. Since then,
  117. ;; there might be a new get operation on the queue. However
  118. ;; only get operations published *after* we publish our put
  119. ;; operation to the putq are responsible for trying to complete
  120. ;; this put operation; we are responsible for get operations
  121. ;; published before we published our put. Therefore, here we
  122. ;; visit the getq again. This is like the "try" phase, but
  123. ;; with the difference that we've published our op state flag
  124. ;; to the queue, so other fibers might be racing to synchronize
  125. ;; on our own op.
  126. (let service-get-ops ((getq (atomic-box-ref getq-box)))
  127. (call-with-values (lambda () (dequeue-match getq not-me?))
  128. (lambda (getq* item)
  129. (define (maybe-commit)
  130. ;; Try to update getq. Return the new getq value in
  131. ;; any case.
  132. (let ((q (atomic-box-compare-and-swap! getq-box getq getq*)))
  133. (if (eq? q getq) getq* q)))
  134. ;; We only have to service the getq if it is non-empty.
  135. (when getq*
  136. (match item
  137. (#(get-flag resume-get)
  138. (match (atomic-box-ref get-flag)
  139. ('S
  140. ;; This get operation has already synchronized;
  141. ;; try to commit and operation and in any
  142. ;; case try again.
  143. (service-get-ops (maybe-commit)))
  144. (_
  145. (let spin ()
  146. (match (atomic-box-compare-and-swap! put-flag 'W 'C)
  147. ('W
  148. ;; We were able to claim our op. Now try to
  149. ;; synchronize on a get operation as well.
  150. (match (atomic-box-compare-and-swap! get-flag 'W 'S)
  151. ('W
  152. ;; It worked! Mark our own op as
  153. ;; synchronized, try to commit the result
  154. ;; getq, and resume both fibers.
  155. (atomic-box-set! put-flag 'S)
  156. (maybe-commit)
  157. (resume-get (lambda () message))
  158. (resume-put values)
  159. (values))
  160. ('C
  161. ;; Other fiber trying to do the same
  162. ;; thing we are; reset our state and try
  163. ;; again.
  164. (atomic-box-set! put-flag 'W)
  165. (spin))
  166. ('S
  167. ;; Other op already synchronized. Reset
  168. ;; our flag, try to remove this dead
  169. ;; entry from the getq, and give it
  170. ;; another go.
  171. (atomic-box-set! put-flag 'W)
  172. (service-get-ops (maybe-commit)))))
  173. (_
  174. ;; Claiming our own op failed; this can only
  175. ;; mean that some other fiber completed our
  176. ;; op for us.
  177. (values)))))))))))))
  178. (make-base-operation #f try-fn block-fn))))
  179. (define (get-operation channel)
  180. "Make an operation that if and when it completes will rendezvous
  181. with a sender fiber to receive one value from @var{channel}."
  182. (match channel
  183. (($ <channel> getq-box getq-gc-counter putq-box putq-gc-counter)
  184. (define (try-fn)
  185. ;; Try to find and perform a pending put operation. If that
  186. ;; works, return a result thunk, or otherwise #f.
  187. (let try ((putq (atomic-box-ref putq-box)))
  188. (call-with-values (lambda () (dequeue putq))
  189. (lambda (putq* item)
  190. (define (maybe-commit)
  191. ;; Try to update putq. Return the new putq value in
  192. ;; any case.
  193. (let ((q (atomic-box-compare-and-swap! putq-box putq putq*)))
  194. (if (eq? q putq) putq* q)))
  195. ;; Return #f if the putq was empty.
  196. (and putq*
  197. (match item
  198. (#(put-flag resume-put message)
  199. (let spin ()
  200. (match (atomic-box-compare-and-swap! put-flag 'W 'S)
  201. ('W
  202. ;; Success. Commit the fresh putq if we
  203. ;; can. If we don't manage to commit right
  204. ;; now, some other get operation will commit
  205. ;; it before synchronizing any other
  206. ;; operation on this channel.
  207. (maybe-commit)
  208. (resume-put values)
  209. ;; Continue directly.
  210. (lambda () message))
  211. ;; Put operation temporarily busy; try again.
  212. ('C (spin))
  213. ;; Put operation already synchronized; pop it
  214. ;; off the putq (if we can) and try again.
  215. ;; If we fail to commit, no big deal, we will
  216. ;; try again next time if no other fiber
  217. ;; handled it already.
  218. ('S (try (maybe-commit))))))))))))
  219. (define (block-fn get-flag get-sched resume-get)
  220. ;; We have suspended the current fiber; arrange for the fiber
  221. ;; to be resumed by a put operation by adding it to the
  222. ;; channel's getq.
  223. (define (not-me? item)
  224. (match item
  225. (#(put-flag resume-put message)
  226. (not (eq? get-flag put-flag)))))
  227. ;; First, publish this get operation.
  228. (enqueue! getq-box (vector get-flag resume-get))
  229. ;; Next, possibly clear off any garbage from queue.
  230. (when (= (counter-decrement! getq-gc-counter) 0)
  231. (dequeue-filter! getq-box
  232. (match-lambda
  233. (#(flag resume)
  234. (not (eq? (atomic-box-ref flag) 'S)))))
  235. (counter-reset! getq-gc-counter))
  236. ;; In the try phase, we scanned the putq for a live put
  237. ;; operation, but we were unable to synchronize. Since then,
  238. ;; there might be a new operation on the putq. However only
  239. ;; put operations published *after* we publish our get
  240. ;; operation to the getq are responsible for trying to complete
  241. ;; this get operation; we are responsible for put operations
  242. ;; published before we published our get. Therefore, here we
  243. ;; visit the putq again. This is like the "try" phase, but
  244. ;; with the difference that we've published our op state flag
  245. ;; to the getq, so other fibers might be racing to synchronize
  246. ;; on our own op.
  247. (let service-put-ops ((putq (atomic-box-ref putq-box)))
  248. (call-with-values (lambda () (dequeue-match putq not-me?))
  249. (lambda (putq* item)
  250. (define (maybe-commit)
  251. ;; Try to update putq. Return the new putq value in
  252. ;; any case.
  253. (let ((q (atomic-box-compare-and-swap! putq-box putq putq*)))
  254. (if (eq? q putq) putq* q)))
  255. ;; We only have to service the putq if it is non-empty.
  256. (when putq*
  257. (match item
  258. (#(put-flag resume-put message)
  259. (match (atomic-box-ref put-flag)
  260. ('S
  261. ;; This put operation has already synchronized;
  262. ;; try to commit the dequeue operation and in any
  263. ;; case try again.
  264. (service-put-ops (maybe-commit)))
  265. (_
  266. (let spin ()
  267. (match (atomic-box-compare-and-swap! get-flag 'W 'C)
  268. ('W
  269. ;; We were able to claim our op. Now try
  270. ;; to synchronize on a put operation as well.
  271. (match (atomic-box-compare-and-swap! put-flag 'W 'S)
  272. ('W
  273. ;; It worked! Mark our own op as
  274. ;; synchronized, try to commit the put
  275. ;; dequeue operation, and mark both
  276. ;; fibers for resumption.
  277. (atomic-box-set! get-flag 'S)
  278. (maybe-commit)
  279. (resume-get (lambda () message))
  280. (resume-put values)
  281. (values))
  282. ('C
  283. ;; Other fiber trying to do the same
  284. ;; thing we are; reset our state and try
  285. ;; again.
  286. (atomic-box-set! get-flag 'W)
  287. (spin))
  288. ('S
  289. ;; Put op already synchronized. Reset
  290. ;; get flag, try to remove this dead
  291. ;; entry from the putq, and give it
  292. ;; another go.
  293. (atomic-box-set! get-flag 'W)
  294. (service-put-ops (maybe-commit)))))
  295. (_
  296. ;; Claiming our own op failed; this can
  297. ;; only mean that some other fiber
  298. ;; completed our op for us.
  299. (values)))))))))))))
  300. (make-base-operation #f try-fn block-fn))))
  301. (define (put-message channel message)
  302. "Send @var{message} on @var{channel}, and return zero values. If
  303. there is already another fiber waiting to receive a message on this
  304. channel, give it our message and continue. Otherwise, block until a
  305. receiver becomes available."
  306. (perform-operation (put-operation channel message)))
  307. (define (get-message channel)
  308. "Receive a message from @var{channel} and return it. If there is
  309. already another fiber waiting to send a message on this channel, take
  310. its message directly. Otherwise, block until a sender becomes
  311. available."
  312. (perform-operation (get-operation channel)))