channel.scm 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. ; Part of Scheme 48 1.9. See file COPYING for notices and license.
  2. ; Authors: Mike Sperber
  3. (define-synchronized-record-type channel :channel
  4. (really-make-channel priority in out)
  5. (priority)
  6. channel?
  7. (priority channel-priority set-channel-priority!)
  8. ;; queue of trans-id * #f
  9. (in channel-in)
  10. ;; queue of trans-id * message
  11. (out channel-out))
  12. (define-record-type q-item :q-item
  13. (make-q-item trans-id message cleanup-proc wrap-proc)
  14. q-item?
  15. (trans-id q-item-trans-id)
  16. (message q-item-message)
  17. (cleanup-proc q-item-cleanup-proc)
  18. (wrap-proc q-item-wrap-proc))
  19. (define (make-channel)
  20. (really-make-channel 1 (make-queue) (make-queue)))
  21. (define (channel=? channel-1 channel-2)
  22. (eq? channel-1 channel-2))
  23. (define (clean-and-enqueue! queue value)
  24. (clean-queue-head! queue)
  25. (enqueue! queue value))
  26. (define (clean-and-dequeue! queue)
  27. (let loop ()
  28. (if (queue-empty? queue)
  29. #f
  30. (let ((front (dequeue! queue)))
  31. (if (trans-id-cancelled? (q-item-trans-id front))
  32. (loop)
  33. front)))))
  34. (define (clean-queue-head! queue)
  35. (let loop ()
  36. (if (not (queue-empty? queue))
  37. (let ((front (queue-head queue)))
  38. (if (trans-id-cancelled? (q-item-trans-id front))
  39. (begin
  40. (dequeue! queue)
  41. (loop)))))))
  42. (define (send-rv channel message)
  43. (make-base
  44. (lambda ()
  45. (let ((in (channel-in channel)))
  46. (clean-queue-head! in)
  47. (if (queue-empty? in)
  48. (make-blocked (lambda (trans-id cleanup-proc wrap-proc)
  49. (clean-and-enqueue! (channel-out channel)
  50. (make-q-item trans-id
  51. message
  52. cleanup-proc
  53. wrap-proc))))
  54. (let ((priority (channel-priority channel)))
  55. (set-channel-priority! channel (+ 1 priority))
  56. (make-enabled
  57. priority
  58. (lambda (queue)
  59. (let ((q-item (dequeue! in)))
  60. (set-channel-priority! channel 1)
  61. ((q-item-cleanup-proc q-item) queue)
  62. (let ((trans-id (q-item-trans-id q-item)))
  63. (trans-id-set-value! trans-id
  64. (cons message
  65. (q-item-wrap-proc q-item)))
  66. (enqueue! queue (trans-id-thread-cell trans-id)))
  67. (unspecific))))))))))
  68. (define (send channel message)
  69. (sync (send-rv channel message)))
  70. (define (receive-rv channel)
  71. (make-base
  72. (lambda ()
  73. (let ((out (channel-out channel)))
  74. (clean-queue-head! out)
  75. (if (queue-empty? out)
  76. (make-blocked (lambda (trans-id cleanup-proc wrap-proc)
  77. (clean-and-enqueue! (channel-in channel)
  78. (make-q-item trans-id
  79. #f
  80. cleanup-proc
  81. wrap-proc))))
  82. (let ((priority (channel-priority channel)))
  83. (set-channel-priority! channel (+ 1 priority))
  84. (make-enabled
  85. priority
  86. (lambda (queue)
  87. (let ((q-item (dequeue! out)))
  88. (set-channel-priority! channel 1)
  89. ((q-item-cleanup-proc q-item) queue)
  90. (let ((trans-id (q-item-trans-id q-item)))
  91. (trans-id-set-value! trans-id
  92. (cons (unspecific)
  93. (q-item-wrap-proc q-item)))
  94. (enqueue! queue (trans-id-thread-cell trans-id)))
  95. (q-item-message q-item))))))))))
  96. (define (receive channel)
  97. (sync (receive-rv channel)))