channel.scm 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. (define-synchronized-record-type channel :channel
  2. (really-make-channel priority in out)
  3. (priority)
  4. channel?
  5. (priority channel-priority set-channel-priority!)
  6. ;; queue of trans-id * #f
  7. (in channel-in)
  8. ;; queue of trans-id * message
  9. (out channel-out))
  10. (define-record-type q-item :q-item
  11. (make-q-item trans-id message cleanup-proc wrap-proc)
  12. q-item?
  13. (trans-id q-item-trans-id)
  14. (message q-item-message)
  15. (cleanup-proc q-item-cleanup-proc)
  16. (wrap-proc q-item-wrap-proc))
  17. (define (make-channel)
  18. (really-make-channel 1 (make-queue) (make-queue)))
  19. (define (channel=? channel-1 channel-2)
  20. (eq? channel-1 channel-2))
  21. (define (clean-and-enqueue! queue value)
  22. (clean-queue-head! queue)
  23. (enqueue! queue value))
  24. (define (clean-and-dequeue! queue)
  25. (let loop ()
  26. (if (queue-empty? queue)
  27. #f
  28. (let ((front (dequeue! queue)))
  29. (if (trans-id-cancelled? (q-item-trans-id front))
  30. (loop)
  31. front)))))
  32. (define (clean-queue-head! queue)
  33. (let loop ()
  34. (if (not (queue-empty? queue))
  35. (let ((front (queue-head queue)))
  36. (if (trans-id-cancelled? (q-item-trans-id front))
  37. (begin
  38. (dequeue! queue)
  39. (loop)))))))
  40. (define (send-rv channel message)
  41. (make-base
  42. (lambda ()
  43. (let ((in (channel-in channel)))
  44. (clean-queue-head! in)
  45. (if (queue-empty? in)
  46. (make-blocked (lambda (trans-id cleanup-proc wrap-proc)
  47. (clean-and-enqueue! (channel-out channel)
  48. (make-q-item trans-id
  49. message
  50. cleanup-proc
  51. wrap-proc))))
  52. (let ((priority (channel-priority channel)))
  53. (set-channel-priority! channel (+ 1 priority))
  54. (make-enabled
  55. priority
  56. (lambda (queue)
  57. (let ((q-item (dequeue! in)))
  58. (set-channel-priority! channel 1)
  59. ((q-item-cleanup-proc q-item) queue)
  60. (let ((trans-id (q-item-trans-id q-item)))
  61. (trans-id-set-value! trans-id
  62. (cons message
  63. (q-item-wrap-proc q-item)))
  64. (enqueue! queue (trans-id-thread-cell trans-id)))
  65. (unspecific))))))))))
  66. (define (send channel message)
  67. (sync (send-rv channel message)))
  68. (define (receive-rv channel)
  69. (make-base
  70. (lambda ()
  71. (let ((out (channel-out channel)))
  72. (clean-queue-head! out)
  73. (if (queue-empty? out)
  74. (make-blocked (lambda (trans-id cleanup-proc wrap-proc)
  75. (clean-and-enqueue! (channel-in channel)
  76. (make-q-item trans-id
  77. #f
  78. cleanup-proc
  79. wrap-proc))))
  80. (let ((priority (channel-priority channel)))
  81. (set-channel-priority! channel (+ 1 priority))
  82. (make-enabled
  83. priority
  84. (lambda (queue)
  85. (let ((q-item (dequeue! out)))
  86. (set-channel-priority! channel 1)
  87. ((q-item-cleanup-proc q-item) queue)
  88. (let ((trans-id (q-item-trans-id q-item)))
  89. (trans-id-set-value! trans-id
  90. (cons (unspecific)
  91. (q-item-wrap-proc q-item)))
  92. (enqueue! queue (trans-id-thread-cell trans-id)))
  93. (q-item-message q-item))))))))))
  94. (define (receive channel)
  95. (sync (receive-rv channel)))