rendezvous.scm 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. ; Part of Scheme 48 1.9. See file COPYING for notices and license.
  2. ; Authors: Mike Sperber
  3. ;; Rendezvous combinators
  4. ;; There's considerable potential for more abstraction in the protocol
  5. ;; for primitive rendezvous. --Mike
  6. (define-record-type prim-rv :prim-rv
  7. (really-make-prim-rv wrap-proc poll-thunk)
  8. prim-rv?
  9. (wrap-proc prim-rv-wrap-proc)
  10. ;; This is a thunk which checks whether the rendezvous is currently
  11. ;; enabled or not; if it is, the thunk must return an ENABLED
  12. ;; record, if it isn't, a BLOCKED record. It must perform only
  13. ;; provisional operations.
  14. (poll-thunk prim-rv-poll-thunk))
  15. (define (make-prim-rv poll-thunk)
  16. (really-make-prim-rv identity poll-thunk))
  17. (define-record-type prim-rv :enabled
  18. (make-enabled priority do-proc)
  19. enabled?
  20. (priority enabled-priority)
  21. ;; DO-PROC takes as its argument a queue of thread cells where it's
  22. ;; supposed to enqueue threads to be woken up upon commit. It must
  23. ;; perform only provisional operations.
  24. (do-proc enabled-do-proc))
  25. (define-record-type blocked :blocked
  26. (make-blocked proc)
  27. blocked?
  28. ;; PROC is a procedure with three arguments:
  29. ;; a TRANS-ID, a CLEANUP-PROC, and a WRAP-PROC.
  30. ;; TRANS-ID is the transaction ID of the blocked thread. The
  31. ;; TRANS-ID should be fed, when it's woken up, a pair consisting of
  32. ;; a return value and a wrap-proc procedure.
  33. ;; CLEANUP-PROC is a procedure which takes a thread queue as an
  34. ;; argument; it can enqueue thread cells whose threads it wants
  35. ;; woken up. This procedure is called from an operation which
  36. ;; enables this rendezvous. It must perform only provisional state
  37. ;; mutations.
  38. ;; WRAP-PROC is the complete, composed-together chain of WRAP
  39. ;; procedures of the event.
  40. (proc blocked-proc))
  41. (define-record-type base :base
  42. (really-make-base prim-rvs)
  43. base?
  44. (prim-rvs base-prim-rvs))
  45. (define (make-base poll-thunk)
  46. (really-make-base (list (make-prim-rv poll-thunk))))
  47. (define-record-type choose :choose
  48. (make-choose rvs)
  49. choose?
  50. (rvs choose-rvs))
  51. (define-record-type guard :guard
  52. (make-guard thunk)
  53. guard?
  54. (thunk guard-thunk))
  55. (define-record-type with-nack :with-nack
  56. (make-nack proc)
  57. nack?
  58. (proc nack-proc))
  59. ;; Condition variables for internal use
  60. ;; These are essentially for placeholder-like rendezvous, only they
  61. ;; don't communicate a value.
  62. (define-synchronized-record-type cvar :cvar
  63. (really-make-cvar state)
  64. cvar?
  65. ;; this can be one of the two below:
  66. (state cvar-state set-cvar-state!))
  67. (define-synchronized-record-type cvar-unset-state :cvar-unset-state
  68. (make-cvar-unset-state blocked)
  69. cvar-unset-state?
  70. ;; this is a list of :CVAR-ITEM
  71. (blocked cvar-unset-state-blocked set-cvar-unset-state-blocked!))
  72. (define-record-type cvar-item :cvar-item
  73. (make-cvar-item trans-id cleanup-proc wrap-proc)
  74. cvar-item?
  75. (trans-id cvar-item-trans-id)
  76. (cleanup-proc cvar-item-cleanup-proc)
  77. (wrap-proc cvar-item-wrap-proc))
  78. (define-synchronized-record-type cvar-set-state :cvar-set-state
  79. (make-cvar-set-state priority)
  80. cvar-set-state?
  81. (priority cvar-set-state-priority set-cvar-set-state-priority!))
  82. (define (make-cvar)
  83. (really-make-cvar (make-cvar-unset-state '())))
  84. ;; Note that this happens during synchronization which is why the
  85. ;; QUEUE argument is there.
  86. (define (cvar-set! cvar queue)
  87. (let ((state (cvar-state cvar)))
  88. (cond
  89. ((cvar-unset-state? state)
  90. (for-each (lambda (cvar-item)
  91. (let ((trans-id (cvar-item-trans-id cvar-item)))
  92. (if (not (trans-id-cancelled? trans-id))
  93. (begin
  94. ((cvar-item-cleanup-proc cvar-item) queue)
  95. (trans-id-set-value! trans-id
  96. (cons (unspecific)
  97. (cvar-item-wrap-proc cvar-item)))
  98. (enqueue! queue (trans-id-thread-cell trans-id))))))
  99. (cvar-unset-state-blocked state))
  100. (set-cvar-state! cvar (make-cvar-set-state 1)))
  101. (else
  102. (assertion-violation 'cvar-set! "cvar already set")))))
  103. (define (cvar-get-rv cvar)
  104. (make-base
  105. (lambda ()
  106. (let ((state (cvar-state cvar)))
  107. (cond
  108. ((cvar-set-state? state)
  109. (let ((priority (cvar-set-state-priority state)))
  110. (set-cvar-set-state-priority! state (+ 1 priority))
  111. (make-enabled priority
  112. (lambda (queue)
  113. (set-cvar-set-state-priority! state 1)
  114. (unspecific)))))
  115. (else
  116. (make-blocked
  117. (lambda (trans-id cleanup-proc wrap-proc)
  118. (set-cvar-unset-state-blocked!
  119. state
  120. (cons (make-cvar-item trans-id cleanup-proc wrap-proc)
  121. (cvar-unset-state-blocked state)))))))))))
  122. (define (always-rv value)
  123. (make-base
  124. (lambda ()
  125. (make-enabled -1
  126. (lambda (queue)
  127. value)))))
  128. (define never-rv (really-make-base '()))
  129. (define (guard thunk)
  130. (make-guard thunk))
  131. (define (with-nack proc)
  132. (make-nack proc))
  133. (define (gather-prim-rvs rev-rvs prim-rvs)
  134. (cond
  135. ((null? rev-rvs) (really-make-base prim-rvs))
  136. ((not (base? (car rev-rvs)))
  137. (if (null? prim-rvs)
  138. (gather rev-rvs '())
  139. (gather rev-rvs (list (really-make-base prim-rvs)))))
  140. ;; (car rev-rvs) is base
  141. (else
  142. (gather-prim-rvs (cdr rev-rvs)
  143. (append (base-prim-rvs (car rev-rvs))
  144. prim-rvs)))))
  145. (define (gather rev-rvs rvs)
  146. (cond
  147. ((not (null? rev-rvs))
  148. (let ((rv (car rev-rvs)))
  149. (cond
  150. ((choose? rv)
  151. (gather (cdr rev-rvs) (append (choose-rvs rv) rvs)))
  152. ((and (base? rv)
  153. (not (null? rvs))
  154. (base? (car rvs)))
  155. (gather (cdr rev-rvs)
  156. (cons (really-make-base (append (base-prim-rvs rv)
  157. (base-prim-rvs (car rvs))))
  158. (cdr rvs))))
  159. (else
  160. (gather (cdr rev-rvs) (cons rv rvs))))))
  161. ((null? (cdr rvs)) (car rvs))
  162. (else (make-choose rvs))))
  163. (define (choose . rvs)
  164. (gather-prim-rvs (reverse rvs) '()))
  165. (define (compose f g)
  166. (lambda (x)
  167. (f (g x))))
  168. (define (wrap-prim-rv prim-rv wrap-proc)
  169. (really-make-prim-rv (compose wrap-proc
  170. (prim-rv-wrap-proc prim-rv))
  171. (prim-rv-poll-thunk prim-rv)))
  172. (define (wrap rv wrap-proc)
  173. (cond
  174. ((base? rv)
  175. (really-make-base (map (lambda (prim-rv)
  176. (wrap-prim-rv prim-rv wrap-proc))
  177. (base-prim-rvs rv))))
  178. ((choose? rv)
  179. (make-choose (map (lambda (rv)
  180. (wrap rv wrap-proc))
  181. (choose-rvs rv))))
  182. ((guard? rv)
  183. (make-guard (lambda ()
  184. (wrap ((guard-thunk rv)) wrap-proc))))
  185. ((nack? rv)
  186. (make-nack (lambda (nack-rv)
  187. (wrap ((nack-proc rv) nack-rv) wrap-proc))))))
  188. (define-record-type base-group :base-group
  189. (really-make-base-group prim-rvs)
  190. base-group?
  191. (prim-rvs base-group-prim-rvs))
  192. (define-record-discloser :base-group
  193. (lambda (base-group)
  194. (cons 'base-group
  195. (base-group-prim-rvs base-group))))
  196. (define-record-type choose-group :choose-group
  197. (make-choose-group groups)
  198. choose-group?
  199. (groups choose-group-groups))
  200. (define-record-discloser :choose-group
  201. (lambda (choose-group)
  202. (cons 'choose-group
  203. (choose-group-groups choose-group))))
  204. (define-record-type nack-group :nack-group
  205. (make-nack-group cvar group)
  206. nack-group?
  207. (cvar nack-group-cvar)
  208. (group nack-group-group))
  209. (define-record-discloser :nack-group
  210. (lambda (nack-group)
  211. (list 'nack-group
  212. (nack-group-group nack-group))))
  213. (define (force-rv rv)
  214. (cond
  215. ((base? rv)
  216. (really-make-base-group (base-prim-rvs rv)))
  217. (else
  218. (really-force-rv rv))))
  219. (define (force-prim-rvs rvs prim-rvs)
  220. (if (null? rvs)
  221. (really-make-base-group prim-rvs)
  222. (let* ((rv (car rvs))
  223. (group (really-force-rv rv)))
  224. (cond
  225. ((base-group? group)
  226. (force-prim-rvs (cdr rvs)
  227. (append (base-group-prim-rvs group)
  228. prim-rvs)))
  229. ((choose-group? group)
  230. (force-rvs (cdr rvs)
  231. (append (choose-group-groups group)
  232. (list (really-make-base-group prim-rvs)))))
  233. (else
  234. (force-rvs (cdr rvs)
  235. (list group (really-make-base-group prim-rvs))))))))
  236. (define (force-rvs rvs groups)
  237. (cond
  238. ((not (null? rvs))
  239. (let* ((rv (car rvs))
  240. (group (really-force-rv rv)))
  241. (cond
  242. ((and (base-group? group)
  243. (not (null? groups))
  244. (base-group? (car groups)))
  245. (force-rvs (cdr rvs)
  246. (cons (really-make-base-group
  247. (append (base-group-prim-rvs group)
  248. (base-group-prim-rvs (car groups))))
  249. (cdr groups))))
  250. ((choose-group? group)
  251. (force-rvs (cdr rvs)
  252. (append (choose-group-groups group)
  253. groups)))
  254. (else
  255. (force-rvs (cdr rvs) (cons group groups))))))
  256. ((null? (cdr groups))
  257. (car groups))
  258. (else
  259. (make-choose-group groups))))
  260. ;; this corresponds to force' in Reppy's implementation
  261. (define (really-force-rv rv)
  262. (cond
  263. ((guard? rv)
  264. (really-force-rv ((guard-thunk rv))))
  265. ((nack? rv)
  266. (let ((cvar (make-cvar)))
  267. (make-nack-group cvar
  268. (really-force-rv
  269. ((nack-proc rv)
  270. (cvar-get-rv cvar))))))
  271. ((base? rv)
  272. (really-make-base-group (base-prim-rvs rv)))
  273. ((choose? rv)
  274. (force-prim-rvs (choose-rvs rv) '()))
  275. (else
  276. (assertion-violation 'really-force-rv "not a rendezvous" rv))))
  277. (define (sync-prim-rv prim-rv)
  278. (let ((poll-thunk (prim-rv-poll-thunk prim-rv))
  279. (wrap-proc (prim-rv-wrap-proc prim-rv)))
  280. (let ((old (current-proposal)))
  281. (let lose ()
  282. (set-current-proposal! (make-proposal))
  283. (let ((status ((prim-rv-poll-thunk prim-rv))))
  284. (cond
  285. ((enabled? status)
  286. (let* ((queue (make-queue))
  287. (value ((enabled-do-proc status) queue)))
  288. (if (maybe-commit-and-make-ready queue)
  289. (begin
  290. (set-current-proposal! old)
  291. (wrap-proc value))
  292. (lose))))
  293. ((blocked? status)
  294. (let ((trans-id (make-trans-id)))
  295. ((blocked-proc status) trans-id values wrap-proc)
  296. (cond
  297. ((maybe-commit-and-trans-id-value trans-id)
  298. => (lambda (pair)
  299. (set-current-proposal! old)
  300. ((cdr pair) (car pair))))
  301. (else
  302. (lose)))))))))))
  303. (define (select-do-proc priority+do-list n)
  304. (cond
  305. ((null? (cdr priority+do-list))
  306. (cdar priority+do-list))
  307. (else
  308. (let ((priority
  309. (lambda (p)
  310. (if (= p -1)
  311. n
  312. p))))
  313. (let max ((rest priority+do-list)
  314. (maximum 0)
  315. (k 0) ; (length do-procs)
  316. (do-list '())) ; #### list of pairs do-proc * wrap-proc
  317. (cond
  318. ((not (null? rest))
  319. (let* ((pair (car rest))
  320. (p (priority (car pair)))
  321. (stuff (cdr pair)))
  322. (cond
  323. ((> p maximum)
  324. (max (cdr rest) p 1 (list stuff)))
  325. ((= p maximum)
  326. (max (cdr rest) maximum (+ 1 k) (cons stuff do-list)))
  327. (else
  328. (max (cdr rest) maximum k do-list)))))
  329. ((null? (cdr do-list))
  330. (car do-list))
  331. (else
  332. ;; List.nth(doFns, random k)
  333. (car do-list))))))))
  334. (define (block)
  335. (with-new-proposal (lose)
  336. (maybe-commit-and-block (make-cell (current-thread)))))
  337. (define (sync-prim-rvs prim-rvs)
  338. (cond
  339. ((null? prim-rvs)
  340. (block))
  341. ((null? (cdr prim-rvs)) (sync-prim-rv (car prim-rvs)))
  342. (else
  343. (let ((old (current-proposal)))
  344. (let lose ()
  345. (define (find-enabled prim-rvs block-procs wrap-procs)
  346. (if (null? prim-rvs)
  347. (let ((trans-id (make-trans-id)))
  348. (for-each (lambda (block-proc wrap-proc)
  349. (block-proc trans-id values wrap-proc))
  350. block-procs wrap-procs)
  351. (cond
  352. ((maybe-commit-and-trans-id-value trans-id)
  353. => (lambda (pair)
  354. (set-current-proposal! old)
  355. ((cdr pair) (car pair))))
  356. (else
  357. (lose))))
  358. (let* ((prim-rv (car prim-rvs))
  359. (poll-thunk (prim-rv-poll-thunk prim-rv))
  360. (wrap-proc (prim-rv-wrap-proc prim-rv))
  361. (status (poll-thunk)))
  362. (cond
  363. ((enabled? status)
  364. (handle-enabled (cdr prim-rvs)
  365. (list
  366. (cons (enabled-priority status)
  367. (cons (enabled-do-proc status)
  368. wrap-proc)))
  369. 1))
  370. ((blocked? status)
  371. (find-enabled (cdr prim-rvs)
  372. (cons (blocked-proc status)
  373. block-procs)
  374. (cons wrap-proc wrap-procs)))))))
  375. (define (handle-enabled prim-rvs priority+do-list priority)
  376. (if (null? prim-rvs)
  377. (let* ((stuff (select-do-proc priority+do-list priority))
  378. (do-proc (car stuff))
  379. (wrap-proc (cdr stuff))
  380. (queue (make-queue))
  381. (value (do-proc queue)))
  382. (if (maybe-commit-and-make-ready queue)
  383. (begin
  384. (set-current-proposal! old)
  385. (wrap-proc value))
  386. (lose)))
  387. (let* ((prim-rv (car prim-rvs))
  388. (poll-thunk (prim-rv-poll-thunk prim-rv))
  389. (wrap-proc (prim-rv-wrap-proc prim-rv))
  390. (status (poll-thunk)))
  391. (cond
  392. ((enabled? status)
  393. (handle-enabled (cdr prim-rvs)
  394. (cons (cons (enabled-priority status)
  395. (cons (enabled-do-proc status)
  396. wrap-proc))
  397. priority+do-list)
  398. (+ 1 priority)))
  399. (else
  400. (handle-enabled (cdr prim-rvs)
  401. priority+do-list
  402. priority))))))
  403. (set-current-proposal! (make-proposal))
  404. (find-enabled prim-rvs '() '()))))))
  405. (define (sync rv)
  406. (let ((group (force-rv rv)))
  407. (cond
  408. ((base-group? group)
  409. (sync-prim-rvs (base-group-prim-rvs group)))
  410. (else
  411. (sync-group group)))))
  412. (define-record-type ack-flag :ack-flag
  413. (really-make-ack-flag acked?)
  414. ack-flag?
  415. (acked? flag-acked? set-flag-acked?!))
  416. (define (make-ack-flag)
  417. (really-make-ack-flag #f))
  418. (define (ack-flag! ack-flag)
  419. (set-flag-acked?! ack-flag #t))
  420. (define-record-type flag-set :flag-set
  421. (make-flag-set cvar ack-flags)
  422. flag-set?
  423. (cvar flag-set-cvar)
  424. (ack-flags flag-set-ack-flags))
  425. (define (check-cvars! flag-sets queue)
  426. (for-each (lambda (flag-set)
  427. (check-cvar! flag-set queue))
  428. flag-sets))
  429. (define (check-cvar! flag-set queue)
  430. (let loop ((ack-flags (flag-set-ack-flags flag-set)))
  431. (cond
  432. ((null? ack-flags)
  433. (cvar-set! (flag-set-cvar flag-set) queue))
  434. ((flag-acked? (car ack-flags))
  435. (values))
  436. (else
  437. (loop (cdr ack-flags))))))
  438. ;; this corresponds to syncOnGrp from Reppy's code
  439. (define (sync-group group)
  440. (call-with-values
  441. (lambda () (collect-group group))
  442. really-sync-group))
  443. ;; This is analogous to SYNC-PRIM-RVS
  444. (define (really-sync-group prim-rv+ack-flag-list flag-sets)
  445. (let ((old (current-proposal)))
  446. (let lose ()
  447. (define (find-enabled prim-rv+ack-flag-list
  448. block-proc+ack-flag-list
  449. wrap-procs)
  450. (if (null? prim-rv+ack-flag-list)
  451. (let ((trans-id (make-trans-id)))
  452. (for-each (lambda (block-proc+ack-flag wrap-proc)
  453. (let ((block-proc (car block-proc+ack-flag))
  454. (ack-flag (cdr block-proc+ack-flag)))
  455. (block-proc trans-id
  456. (lambda (queue)
  457. (ack-flag! ack-flag)
  458. (check-cvars! flag-sets queue))
  459. wrap-proc)))
  460. block-proc+ack-flag-list wrap-procs)
  461. (cond
  462. ((maybe-commit-and-trans-id-value trans-id)
  463. => (lambda (pair)
  464. (set-current-proposal! old)
  465. ((cdr pair) (car pair))))
  466. (else
  467. (lose))))
  468. (let* ((prim-rv (caar prim-rv+ack-flag-list))
  469. (ack-flag (cdar prim-rv+ack-flag-list))
  470. (poll-thunk (prim-rv-poll-thunk prim-rv))
  471. (wrap-proc (prim-rv-wrap-proc prim-rv))
  472. (status (poll-thunk)))
  473. (cond
  474. ((enabled? status)
  475. (handle-enabled (cdr prim-rv+ack-flag-list)
  476. (list
  477. (cons (enabled-priority status)
  478. (cons (cons (enabled-do-proc status) ack-flag)
  479. wrap-proc)))
  480. 1))
  481. ((blocked? status)
  482. (find-enabled (cdr prim-rv+ack-flag-list)
  483. (cons (cons (blocked-proc status) ack-flag)
  484. block-proc+ack-flag-list)
  485. (cons wrap-proc wrap-procs)))))))
  486. (define (handle-enabled prim-rv+ack-flag-list priority+do-list priority)
  487. (if (null? prim-rv+ack-flag-list)
  488. (let* ((stuff (select-do-proc priority+do-list priority))
  489. (more-stuff (car stuff))
  490. (do-proc (car more-stuff))
  491. (ack-flag (cdr more-stuff))
  492. (wrap-proc (cdr stuff))
  493. (queue (make-queue)))
  494. (ack-flag! ack-flag)
  495. (check-cvars! flag-sets queue)
  496. (let ((value (do-proc queue)))
  497. (if (maybe-commit-and-make-ready queue)
  498. (begin
  499. (set-current-proposal! old)
  500. (wrap-proc value))
  501. (lose))))
  502. (let* ((prim-rv+ack-flag (car prim-rv+ack-flag-list))
  503. (prim-rv (car prim-rv+ack-flag))
  504. (ack-flag (cdr prim-rv+ack-flag))
  505. (poll-thunk (prim-rv-poll-thunk prim-rv))
  506. (wrap-proc (prim-rv-wrap-proc prim-rv))
  507. (status (poll-thunk)))
  508. (cond
  509. ((enabled? status)
  510. (handle-enabled (cdr prim-rv+ack-flag-list)
  511. (cons (cons (enabled-priority status)
  512. (cons (cons (enabled-do-proc status) ack-flag)
  513. wrap-proc))
  514. priority+do-list)
  515. (+ 1 priority)))
  516. (else
  517. (handle-enabled (cdr prim-rv+ack-flag-list)
  518. priority+do-list
  519. priority))))))
  520. (set-current-proposal! (make-proposal))
  521. (find-enabled prim-rv+ack-flag-list '() '()))))
  522. (define (collect-group group)
  523. (cond
  524. ((choose-group? group)
  525. (gather-choose-group group))
  526. (else
  527. (gather-wrapped group '() '()))))
  528. (define (gather-choose-group group)
  529. (let ((ack-flag (make-ack-flag)))
  530. (let gather ((group group)
  531. (prim-rv+ack-flag-list '())
  532. (flag-sets '()))
  533. (cond
  534. ((base-group? group)
  535. (let append ((prim-rvs (base-group-prim-rvs group))
  536. (prim-rv+ack-flag-list prim-rv+ack-flag-list))
  537. (if (null? prim-rvs)
  538. (values prim-rv+ack-flag-list flag-sets)
  539. (append (cdr prim-rvs)
  540. (cons (cons (car prim-rvs) ack-flag)
  541. prim-rv+ack-flag-list)))))
  542. ((choose-group? group)
  543. ;; fold-left
  544. (let loop ((groups (choose-group-groups group))
  545. (prim-rv+ack-flag-list prim-rv+ack-flag-list)
  546. (flag-sets flag-sets))
  547. (if (null? groups)
  548. (values prim-rv+ack-flag-list flag-sets)
  549. (call-with-values
  550. (lambda ()
  551. (gather (car groups)
  552. prim-rv+ack-flag-list
  553. flag-sets))
  554. (lambda (prim-rv+ack-flag-list flag-sets)
  555. (loop (cdr groups)
  556. prim-rv+ack-flag-list
  557. flag-sets))))))
  558. ((nack-group? group)
  559. (gather-wrapped group prim-rv+ack-flag-list flag-sets))))))
  560. (define (gather-wrapped group prim-rv+ack-flag-list flag-sets)
  561. (call-with-values
  562. (lambda ()
  563. (let gather ((group group)
  564. (prim-rv+ack-flag-list prim-rv+ack-flag-list)
  565. (all-flags '())
  566. (flag-sets flag-sets))
  567. (cond
  568. ((base-group? group)
  569. (let append ((prim-rvs (base-group-prim-rvs group))
  570. (prim-rv+ack-flag-list prim-rv+ack-flag-list)
  571. (all-flags all-flags))
  572. (if (null? prim-rvs)
  573. (values prim-rv+ack-flag-list
  574. all-flags
  575. flag-sets)
  576. (let ((ack-flag (make-ack-flag)))
  577. (append (cdr prim-rvs)
  578. (cons (cons (car prim-rvs) ack-flag)
  579. prim-rv+ack-flag-list)
  580. (cons ack-flag all-flags))))))
  581. ((choose-group? group)
  582. ;; fold-left
  583. (let loop ((groups (choose-group-groups group))
  584. (prim-rv+ack-flag-list prim-rv+ack-flag-list)
  585. (all-flags all-flags)
  586. (flag-sets flag-sets))
  587. (if (null? groups)
  588. (values prim-rv+ack-flag-list
  589. all-flags
  590. flag-sets)
  591. (call-with-values
  592. (lambda ()
  593. (gather (car groups)
  594. prim-rv+ack-flag-list
  595. all-flags
  596. flag-sets))
  597. (lambda (prim-rv+ack-flag-list all-flags flag-sets)
  598. (loop (cdr groups)
  599. prim-rv+ack-flag-list all-flags flag-sets))))))
  600. ((nack-group? group)
  601. (call-with-values
  602. (lambda ()
  603. (gather (nack-group-group group)
  604. prim-rv+ack-flag-list
  605. '()
  606. flag-sets))
  607. (lambda (prim-rv+ack-flag-list all-flags-new flag-sets)
  608. (values prim-rv+ack-flag-list
  609. (append all-flags-new all-flags)
  610. (cons (make-flag-set (nack-group-cvar group)
  611. all-flags-new)
  612. flag-sets))))))))
  613. (lambda (prim-rv+ack-flag-list all-flags flag-sets)
  614. (values prim-rv+ack-flag-list flag-sets))))
  615. (define (select . rvs)
  616. (sync (apply choose rvs)))