pipe.scm 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. ; Part of Scheme 48 1.9. See file COPYING for notices and license.
  2. ; Authors: Richard Kelsey, Jonathan Rees, Robert Ransom
  3. ; Pipes.
  4. ;
  5. ; This would be easy except that we have to deal with threads (and who else
  6. ; would be using pipes?).
  7. ;
  8. ; Pipes either have a fixed set of buffers which are continually recycled or
  9. ; create new buffers as needed. Having a fixed buffer supply keeps readers
  10. ; and writers more-or-less synchronised, while creating buffers as needed allows
  11. ; the writers to get arbitrarily far ahead of the readers.
  12. ;
  13. ; A fixed-buffer pipe has two buffers. At any point one is acting as the
  14. ; input buffer and the other as the output buffer. When the input buffer is
  15. ; empty the two are swapped.
  16. ;
  17. ; The complexity of the code below comes from having to deal with the two
  18. ; blocking situations:
  19. ; - a read is done when all buffers are empty
  20. ; - a write is done when the output buffer is full and the input buffer
  21. ; is non-empty (and we aren't allowed to make more buffers)
  22. ;
  23. ; If a read occurs when all buffers are empty we swap a zero-length buffer in
  24. ; for the output-buffer and block on a condition variable. The zero-length
  25. ; buffer guarantees that the reading thread will be awakened when the next
  26. ; write occurs. When a write occurs with a zero-length buffer we swap in the
  27. ; real buffer, do the write, and then set the input condition variable.
  28. ;
  29. ; When a write occurs with the write buffer full for a pipe without a fixed
  30. ; set of buffers the full buffer is added to a queue.
  31. ;
  32. ; For a pipe with only two buffers, if a write occurs with the write buffer
  33. ; full and the read buffer non-empty, we set the read-limit to be one shorter
  34. ; than its real value and block on a condition variable. The bogus read-limit
  35. ; means that the writing thread will be woken when a read empties the buffer
  36. ; and not have to wait until the following read. When a read reaches the
  37. ; read-limit we check to see if there are waiting output threads. If so, we
  38. ; write one more character and then wake the sleepers.
  39. ;
  40. ; If this were a little more integrated with the threads package pipes
  41. ; could use queues instead of making new condition variables all the time.
  42. (define-record-type pipe-data :pipe-data
  43. (make-pipe-data lock in-condvar out-condvar queue out-buffer)
  44. pipe-data?
  45. (lock pipe-lock) ; a lock for keeping this pipe single threaded
  46. (in-condvar ; waiting for a non-empty buffer
  47. pipe-in-condvar
  48. set-pipe-in-condvar!)
  49. (out-condvar ; waiting for an empty buffer
  50. pipe-out-condvar
  51. set-pipe-out-condvar!)
  52. (queue ; queue of full buffers, or #f for a pipe with a fixed
  53. pipe-buffer-queue); buffer set
  54. (out-buffer ; stashed output buffer
  55. pipe-out-buffer
  56. set-pipe-out-buffer!))
  57. (define (lock pipe)
  58. (obtain-lock (pipe-lock (port-data pipe))))
  59. (define (unlock pipe)
  60. (release-lock (pipe-lock (port-data pipe))))
  61. ; Swap the buffers and initialize the various buffer pointers.
  62. (define (swap-buffers! port)
  63. (let ((temp (port-in-buffer port)))
  64. (set-port-in-buffer! port (port-out-buffer port))
  65. (set-port-out-buffer! port temp)
  66. (set-port-in-limit! port (port-out-index port))
  67. (set-port-in-index! port 0)
  68. (set-port-out-index! port 0)))
  69. ; Get a full buffer from the queue.
  70. (define (use-buffer-from-queue port)
  71. (let ((queue (pipe-buffer-queue (port-data port))))
  72. (if (and queue (not (queue-empty? queue)))
  73. (let ((buffer (dequeue! queue)))
  74. (set-port-in-buffer! port buffer)
  75. (set-port-in-index! port 0)
  76. (set-port-in-limit! port (code-vector-length buffer))
  77. #t)
  78. #f)))
  79. ;----------------------------------------------------------------
  80. ; Input buffers
  81. ; Get a non-empty input buffer, if possible. We have five options:
  82. ; 1. The current buffer isn't empty.
  83. ; 2. The current buffer looks empty but really isn't; the limit was
  84. ; decremented by a writer who wants an empty buffer.
  85. ; 3. There is a queue and it has a full buffer.
  86. ; 4. The output buffer isn't empty.
  87. ; 5. The output port is still open and may produce characters in the future.
  88. ; If there are no characters and the output port is closed we lose.
  89. (define (get-in-buffer port)
  90. (cond ((> (port-in-limit port)
  91. (port-in-index port))
  92. #t)
  93. ((pipe-out-condvar (port-data port))
  94. (set-port-in-limit! port (+ 1 (port-in-limit port)))
  95. #t)
  96. ((use-buffer-from-queue port)
  97. #t)
  98. ((< 0 (port-out-index port))
  99. (swap-buffers! port)
  100. #t)
  101. ((open-output-port? port)
  102. (wait-for-input port)
  103. (get-in-buffer port))
  104. (else
  105. #f)))
  106. ; Wait on the input condition variable. If there isn't one, we make a
  107. ; new condition variable and swap in a zero-length write buffer to get the
  108. ; condition variable set as soon as a write occurs.
  109. (define (wait-for-input port)
  110. (let* ((data (port-data port))
  111. (cv (if (pipe-in-condvar data)
  112. (pipe-in-condvar data)
  113. (let ((cv (make-condvar)))
  114. (set-pipe-out-buffer! (port-data port) (port-out-buffer port))
  115. (set-port-out-buffer! port (make-code-vector 0 0))
  116. (set-pipe-in-condvar! data cv)
  117. cv))))
  118. (release-lock (pipe-lock data))
  119. (condvar-ref cv)
  120. (obtain-lock (pipe-lock data))))
  121. ; Wake any threads waiting for input if there are characters available.
  122. (define (wake-any-input-waiters port)
  123. (let ((data (port-data port)))
  124. (let ((cv (pipe-in-condvar data)))
  125. (if (and cv
  126. (or (< 0 (port-out-index port))
  127. (not (open-output-port? port))))
  128. (begin
  129. (set-pipe-in-condvar! data #f)
  130. (condvar-set! cv (unspecific)))))))
  131. ;----------------------------------------------------------------
  132. ; Output buffers
  133. ; Get a non-full output buffer, if possible. We have five options:
  134. ; 1. The current buffer has room.
  135. ; 2. The current buffer looks full but really isn't; it is a zero-length
  136. ; buffer swapped in by a reader who wants characters.
  137. ; 3. There is a queue for full buffers.
  138. ; 4. The input buffer is empty.
  139. ; 5. The input port is still open and may empty its buffer later on.
  140. ; If there are no empty buffers and the input port is closed we lose.
  141. (define (get-out-buffer port)
  142. (let ((len (code-vector-length (port-out-buffer port))))
  143. (cond ((< (port-out-index port) len)
  144. #t)
  145. ((= 0 len)
  146. (set-port-out-buffer! port (pipe-out-buffer (port-data port)))
  147. #t)
  148. ((pipe-buffer-queue (port-data port))
  149. (make-new-out-buffer port)
  150. #t)
  151. ((= (port-in-index port) (port-in-limit port))
  152. (swap-buffers! port)
  153. #t)
  154. ((open-input-port? port)
  155. (wait-for-output port)
  156. (get-out-buffer port))
  157. (else
  158. #f))))
  159. ; Make a new output buffer and put the full one on the queue.
  160. (define (make-new-out-buffer port)
  161. (let* ((old (port-out-buffer port))
  162. (new (make-code-vector (code-vector-length old) 0)))
  163. (enqueue! (pipe-buffer-queue (port-data port)) old)
  164. (set-port-out-buffer! port new)
  165. (set-port-out-index! port 0)))
  166. ; Same as above, on a different condition variable and with a different
  167. ; wakeup method.
  168. (define (wait-for-output port)
  169. (let* ((data (port-data port))
  170. (cv (if (pipe-out-condvar data)
  171. (pipe-out-condvar data)
  172. (let ((cv (make-condvar)))
  173. (set-port-in-limit! port (- (port-in-limit port) 1))
  174. (set-pipe-out-condvar! data cv)
  175. cv))))
  176. (release-lock (pipe-lock data))
  177. (condvar-ref cv)
  178. (obtain-lock (pipe-lock data))))
  179. (define (wake-any-output-waiters port)
  180. (let ((data (port-data port)))
  181. (let ((cv (pipe-out-condvar data)))
  182. (if (and cv
  183. (or (= (port-in-limit port) (port-in-index port))
  184. (not (open-output-port? port))))
  185. (begin
  186. (set-pipe-out-condvar! data #f)
  187. (condvar-set! cv (unspecific)))))))
  188. ; Used by PEEK-CHAR to reset the wakeup limit.
  189. (define (do-not-disturb-output-waiters port)
  190. (if (pipe-out-condvar (port-data port))
  191. (set-port-in-limit! port (- (port-in-limit port) 1))))
  192. ; Close both ports and wake up any sleepers.
  193. (define (close-pipe port close-input?)
  194. (lock port)
  195. (if close-input?
  196. (make-input-port-closed! port))
  197. (make-output-port-closed! port)
  198. (wake-any-input-waiters port)
  199. (wake-any-output-waiters port)
  200. (unlock port))
  201. ;----------------------------------------------------------------
  202. ; The actual handler
  203. (define pipe-handler
  204. (make-port-handler
  205. ;; discloser
  206. (lambda (port)
  207. (list 'pipe))
  208. ;; input port methods --------------------------
  209. ;; close-input-port
  210. (lambda (port)
  211. (close-pipe port #t))
  212. ;; The next three methods are called when the input buffer is empty
  213. ;; read-char
  214. (lambda (port)
  215. (lock port)
  216. (cond ((get-in-buffer port)
  217. (let ((c (read-char port)))
  218. (wake-any-output-waiters port)
  219. (unlock port)
  220. c))
  221. (else
  222. (unlock port)
  223. (eof-object))))
  224. ;; peek-char
  225. (lambda (port)
  226. (lock port)
  227. (cond ((get-in-buffer port)
  228. (let ((c (peek-char port)))
  229. (do-not-disturb-output-waiters port)
  230. (unlock port)
  231. c))
  232. (else
  233. (unlock port)
  234. (eof-object))))
  235. ;; char-ready?
  236. (lambda (port)
  237. (> (port-out-index port) 0))
  238. ;; read-block - the buffer has fewer than COUNT characters
  239. (lambda (thing start count port)
  240. (lock port)
  241. (let loop ((start start) (count count))
  242. (let* ((index (port-in-index port))
  243. (have (min (- (port-in-limit port) index)
  244. count)))
  245. (cond ((> have 0)
  246. (copy! (port-in-buffer port) index thing start have)
  247. (set-port-in-index! port (+ index have))))
  248. (wake-any-output-waiters port)
  249. (cond ((= have count)
  250. (unlock port))
  251. ((get-in-buffer port)
  252. (loop (+ start have) (- count have)))
  253. (else
  254. (unlock port)
  255. (eof-object))))))
  256. ;; output port methods -------------------------
  257. ;; close-output-port
  258. (lambda (port)
  259. (close-pipe port #f))
  260. ;; write-char got a full buffer
  261. (lambda (char port)
  262. (lock port)
  263. (cond ((get-out-buffer port)
  264. (write-char char port)
  265. (wake-any-input-waiters port)
  266. (unlock port))
  267. (else
  268. (unlock port)
  269. (assertion-violation 'write-char "writing to a broken pipe"))))
  270. ;; write-block couldn't fit COUNT characters into the buffer
  271. (lambda (thing start count port)
  272. (lock port)
  273. (let loop ((start start) (count count))
  274. (cond ((get-out-buffer port)
  275. (let* ((buffer (port-out-buffer port))
  276. (index (port-out-index port))
  277. (have (min (- (code-vector-length buffer) index)
  278. count)))
  279. (cond ((> have 0)
  280. (copy! thing start buffer index have)
  281. (set-port-out-index! port (+ index have))))
  282. (wake-any-input-waiters port)
  283. (if (= have count)
  284. (unlock port)
  285. (loop (+ start have) (- count have)))))
  286. (else
  287. (unlock port)
  288. (assertion-violation 'write-block "writing to a broken pipe")))))
  289. ;; force-output
  290. (lambda (port)
  291. (values))))
  292. (define pipe-buffer-size 1024)
  293. ; Takes an optional size to use for the buffers. A size of #f indicates
  294. ; that buffers should be made as needed (we really need omega).
  295. (define (make-pipe . maybe-buffer-size)
  296. (call-with-values
  297. (lambda ()
  298. (parse-make-pipe-args maybe-buffer-size))
  299. (lambda (size queue)
  300. (make-port pipe-handler
  301. (bitwise-ior open-input-port-status
  302. open-output-port-status)
  303. (make-pipe-data (make-lock) ; the lock
  304. #f ; input condition variable
  305. #f ; output condition variable
  306. queue ; full buffer queue
  307. #f) ; stashed output buffer
  308. (make-code-vector size 0) ; input buffer
  309. 0 ; input index
  310. 0 ; input limit
  311. (make-code-vector size 0) ; output buffer
  312. 0)))) ; output limit
  313. (define (parse-make-pipe-args maybe-buffer-size)
  314. (if (null? maybe-buffer-size)
  315. (values pipe-buffer-size #f)
  316. (let ((size (car maybe-buffer-size)))
  317. (cond ((not size)
  318. (values pipe-buffer-size (make-queue)))
  319. ((and (integer? size)
  320. (exact? size)
  321. (< 0 size))
  322. (values size #f))
  323. (else
  324. (assertion-violation 'make-pipe "invalid pipe buffer size" size))))))
  325. ; These should probably be moved to I/O
  326. (define (open-input-port? port)
  327. (= (bitwise-and open-input-port-status
  328. (port-status port))
  329. open-input-port-status))
  330. (define (open-output-port? port)
  331. (= (bitwise-and open-output-port-status
  332. (port-status port))
  333. open-output-port-status))
  334. ; Won't do string->string copies.
  335. (define (copy! from i to j count)
  336. (if (code-vector? from)
  337. (if (code-vector? to)
  338. (copy-bytes! from i to j count)
  339. (copy-bytes->chars! from i to j count))
  340. (copy-chars->bytes! from i to j count)))
  341. ; Copied from more-port.scm.
  342. (define (copy-bytes! from i to j count)
  343. (let ((limit (+ count i)))
  344. (do ((i i (+ i 1))
  345. (j j (+ j 1)))
  346. ((= i limit))
  347. (code-vector-set! to j (code-vector-ref from i)))))
  348. (define (copy-chars->bytes! from i to j count)
  349. (let ((limit (+ count i)))
  350. (do ((i i (+ i 1))
  351. (j j (+ j 1)))
  352. ((= i limit))
  353. (code-vector-set! to j (char->ascii (string-ref from i))))))
  354. (define (copy-bytes->chars! from i to j count)
  355. (let ((limit (+ count i)))
  356. (do ((i i (+ i 1))
  357. (j j (+ j 1)))
  358. ((= i limit))
  359. (string-set! to j (ascii->char (code-vector-ref from i))))))