port-buffer.scm 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. ; Part of Scheme 48 1.9. See file COPYING for notices and license.
  2. ; Authors: Richard Kelsey, Jonathan Rees, Mike Sperber
  3. (define (make-buffered-input-port handler data buffer index limit)
  4. (if (and (okay-buffer? buffer index limit)
  5. (port-handler? handler))
  6. (make-port handler
  7. (enum text-encoding-option latin-1)
  8. #f
  9. (bitwise-ior input-port-mask open-input-port-mask)
  10. #f ; timestamp (was lock)
  11. data
  12. buffer
  13. index
  14. limit
  15. #f ; pending-cr?
  16. #f) ; pending-eof?
  17. (assertion-violation 'make-buffered-input-port
  18. "invalid argument"
  19. handler data buffer index limit)))
  20. (define (make-buffered-output-port handler data buffer index limit)
  21. (if (and (okay-buffer? buffer index limit)
  22. (> limit 0)
  23. (port-handler? handler))
  24. (make-port handler
  25. (enum text-encoding-option latin-1)
  26. #f
  27. open-output-port-status
  28. #f ; timestamp (was lock)
  29. data
  30. buffer
  31. index
  32. limit
  33. #f ; pending-cr?
  34. #f) ; pending-eof?
  35. (assertion-violation 'make-buffered-output-port
  36. "invalid argument"
  37. handler data buffer index limit)))
  38. (define (okay-buffer? buffer index limit)
  39. (and (byte-vector? buffer)
  40. (integer? limit)
  41. (integer? index)
  42. (exact? limit)
  43. (exact? index)
  44. (<= 0 limit (byte-vector-length buffer))
  45. (<= 0 index limit)))
  46. ;----------------
  47. ; (buffered-input-port-handler discloser
  48. ; closer
  49. ; fill-buffer-proc) -> handler
  50. ;
  51. ; (fill-buffer-proc <port> <wait?>)
  52. ; -> <boolean> ; true if commit works, false if it fails
  53. ; Closer must also do a maybe-commit and return the result.
  54. ;
  55. ; If <wait?> is true then wait for input. If <wait?> is false then return
  56. ; immediately even if no input is available.
  57. (define (make-buffered-input-port-handler discloser
  58. closer!
  59. buffer-filler!
  60. ready?)
  61. (make-port-handler (lambda (port)
  62. (discloser (port-data port)))
  63. (lambda (port)
  64. (with-new-proposal (lose)
  65. (make-input-port-closed! port)
  66. (or (closer! (port-data port))
  67. (lose))))
  68. (make-one-byte-input buffer-filler!)
  69. (make-one-char-input buffer-filler!)
  70. (make-read-block buffer-filler!)
  71. (make-byte-ready? ready? #t)
  72. #f)) ; force
  73. ;----------------
  74. ; Rename an old field
  75. (define (note-buffer-reuse! port)
  76. (provisional-set-port-lock! port (cons 'timestamp '())))
  77. ; Calling this has the side-effect of getting the current proposal to check
  78. ; the timestamp value when committing.
  79. (define check-buffer-timestamp! provisional-port-lock)
  80. ; And a current field.
  81. (define port-flushed port-pending-eof?)
  82. (define set-port-flushed! set-port-pending-eof?!)
  83. ;----------------
  84. ; The READ? argument says whether we're doing a READ or a PEEK.
  85. (define (make-one-byte-input buffer-filler!)
  86. (lambda (port read?)
  87. (with-new-proposal (lose)
  88. (let ((index (provisional-port-index port))
  89. (limit (provisional-port-limit port)))
  90. (cond ((not (open-input-port? port))
  91. (remove-current-proposal!)
  92. (assertion-violation (if read? 'read-byte 'peek-byte)
  93. "invalid argument"
  94. port))
  95. ((< index limit)
  96. (if read?
  97. (provisional-set-port-index! port (+ 1 index)))
  98. (let ((b (provisional-byte-vector-ref (port-buffer port)
  99. index)))
  100. (if (maybe-commit)
  101. b
  102. (lose))))
  103. ((provisional-port-pending-eof? port)
  104. (if read?
  105. (provisional-set-port-pending-eof?! port #f))
  106. (if (maybe-commit)
  107. (eof-object)
  108. (lose)))
  109. (else
  110. (provisional-set-port-index! port 0)
  111. (provisional-set-port-limit! port 0)
  112. (buffer-filler! port #t)
  113. (lose)))))))
  114. ; The MODE argument says whether we're doing a READ (#f) , a PEEK (#t),
  115. ; or a CHAR-READY? ( () )
  116. (define (make-one-char-input buffer-filler!)
  117. (lambda (port mode)
  118. (let ((decode
  119. (text-codec-decode-char-proc (port-text-codec port))))
  120. (with-new-proposal (lose)
  121. (let ((limit (provisional-port-limit port)))
  122. (let loop ((index (provisional-port-index port)))
  123. (define (consume&deliver decode-count val)
  124. (if (not mode)
  125. (provisional-set-port-index! port
  126. (+ index decode-count)))
  127. (if (maybe-commit)
  128. val
  129. (lose)))
  130. (cond ((not (open-input-port? port))
  131. (remove-current-proposal!)
  132. (assertion-violation (cond
  133. ((not mode) 'read-char)
  134. ((null? mode) 'char-ready?)
  135. (else 'peek-char))
  136. "invalid argument"
  137. port))
  138. ((< index limit)
  139. (let ((buffer (port-buffer port)))
  140. (call-with-values
  141. (lambda ()
  142. (decode buffer index (- limit index)))
  143. (lambda (ch decode-count)
  144. (cond
  145. (ch
  146. ;; CR/LF handling. Great.
  147. (cond
  148. ((port-crlf? port)
  149. (cond
  150. ((char=? ch cr)
  151. (provisional-set-port-pending-cr?! port #t)
  152. (consume&deliver decode-count
  153. (if (null? mode) ; CHAR-READY?
  154. #t
  155. #\newline)))
  156. ((and (char=? ch #\newline)
  157. (provisional-port-pending-cr? port))
  158. (provisional-set-port-pending-cr?! port #f)
  159. (loop (+ index decode-count)))
  160. (else
  161. (provisional-set-port-pending-cr?! port #f)
  162. (consume&deliver decode-count
  163. (if (null? mode) ; CHAR-READY?
  164. #t
  165. ch)))))
  166. (else
  167. (provisional-set-port-pending-cr?! port #f)
  168. (consume&deliver decode-count
  169. (if (null? mode) ; CHAR-READY?
  170. #t
  171. ch)))))
  172. ((or (not decode-count) ; decoding error
  173. (provisional-port-pending-eof? port)) ; partial char
  174. (consume&deliver 1
  175. (if (null? mode)
  176. #t
  177. #\?)))
  178. ;; need at least DECODE-COUNT bytes
  179. (else
  180. (if (> decode-count
  181. (- (byte-vector-length buffer)
  182. limit))
  183. ;; copy what we have to the
  184. ;; beginning so there's space at the
  185. ;; end we can try to fill
  186. (begin
  187. ;; (debug-message "aligning port buffer")
  188. (attempt-copy-bytes! buffer index
  189. buffer 0
  190. (- limit index))
  191. (provisional-set-port-index! port 0)
  192. (provisional-set-port-limit! port (- limit index))))
  193. (if (or (not (buffer-filler! port (not (null? mode))))
  194. (not (null? mode)))
  195. (lose)
  196. #f)))))))
  197. ((provisional-port-pending-eof? port)
  198. (if (not mode)
  199. (provisional-set-port-pending-eof?! port #f))
  200. (cond
  201. ((not (maybe-commit))
  202. (lose))
  203. ((null? mode) #t)
  204. (else (eof-object))))
  205. (else
  206. (if (= index limit) ; we have zilch
  207. (begin
  208. (provisional-set-port-index! port 0)
  209. (provisional-set-port-limit! port 0))
  210. ;; may be out of synch because of CR/LF conversion
  211. (provisional-set-port-index! port index))
  212. (if (or (not (buffer-filler! port (not (null? mode))))
  213. (not (null? mode)))
  214. (lose)
  215. #f)))))))))
  216. ;----------------
  217. ; See if there is a byte available.
  218. (define (make-byte-ready? ready? read?)
  219. (lambda (port)
  220. (with-new-proposal (lose)
  221. (cond ((not ((if read?
  222. open-input-port?
  223. open-output-port?)
  224. port))
  225. (remove-current-proposal!)
  226. (assertion-violation 'byte-ready? "invalid argument" port))
  227. ((or (< (provisional-port-index port)
  228. (provisional-port-limit port))
  229. (and read?
  230. (provisional-port-pending-eof? port)))
  231. (if (maybe-commit)
  232. #t
  233. (lose)))
  234. (else
  235. (call-with-values
  236. (lambda ()
  237. (ready? port))
  238. (lambda (okay? ready?)
  239. (if okay?
  240. ready?
  241. (lose)))))))))
  242. ;----------------
  243. ; Block input
  244. ;
  245. ; If EOF-OKAY? is true the caller will pass an EOF back to the user. If it's
  246. ; false then the caller already has a value to pass back and we have to preserve
  247. ; an EOF for the next invocation.
  248. (define (make-read-block buffer-filler!)
  249. (lambda (port buffer start count wait?)
  250. (let loop ((have 0) (first? #t))
  251. (with-new-proposal (lose)
  252. (if (open-input-port? port)
  253. (let ((result (cond ((provisional-port-pending-eof? port)
  254. (if (= have 0)
  255. (provisional-set-port-pending-eof?! port #f))
  256. (eof-object))
  257. ((= count 0)
  258. 0)
  259. (else
  260. (get-available-bytes! buffer
  261. (+ start have)
  262. (- count have)
  263. port)))))
  264. (cond ((not result)
  265. (if (or wait? first?)
  266. (if (buffer-filler! port wait?)
  267. (loop have #f)
  268. (lose))
  269. (if (maybe-commit)
  270. 0
  271. (lose))))
  272. ((not (maybe-commit))
  273. (lose))
  274. ((eof-object? result)
  275. (if (= have 0)
  276. result
  277. have))
  278. (else
  279. (let ((have (+ have result)))
  280. (if (< have count)
  281. (loop have #f)
  282. have)))))
  283. (begin
  284. (remove-current-proposal!)
  285. (assertion-violation 'read-block "invalid argument"
  286. port buffer start count)))))))
  287. ; Copy whatever bytes are currently available.
  288. ;
  289. ; Reading the timestamp makes its value part of the current proposal. The
  290. ; timestamp is set whenever the buffer is refilled. Without it the proposal
  291. ; could be fooled if the buffer were refilled and the index and limit just
  292. ; happened to be reset to their current values.
  293. (define (get-available-bytes! buffer start count port)
  294. (let* ((index (provisional-port-index port))
  295. (have (- (provisional-port-limit port)
  296. index)))
  297. (if (< 0 have)
  298. (let ((copy-count (min have count)))
  299. (check-buffer-timestamp! port) ; makes the proposal check this
  300. (attempt-copy-bytes! (port-buffer port)
  301. index
  302. buffer
  303. start
  304. copy-count)
  305. (provisional-set-port-index! port
  306. (+ index copy-count))
  307. copy-count)
  308. (begin
  309. (provisional-set-port-index! port 0)
  310. (provisional-set-port-limit! port 0)
  311. #f))))
  312. ;----------------------------------------------------------------
  313. ; Buffered output ports
  314. ;
  315. ; (buffered-output-port-handler discloser
  316. ; closer
  317. ; empty-buffer-proc) -> handler
  318. ;
  319. ; (empty-buffer-proc <port>) -> whatever
  320. ;
  321. ; The buffer emptier must call maybe-commit.
  322. (define (make-buffered-output-port-handler discloser
  323. closer!
  324. buffer-emptier!
  325. ready?)
  326. (make-port-handler (lambda (port)
  327. (discloser (port-data port)))
  328. (make-closer closer! buffer-emptier!)
  329. (make-one-byte-output buffer-emptier!)
  330. (make-one-char-output buffer-emptier!)
  331. (make-write-block buffer-emptier!)
  332. (make-byte-ready? ready? #f)
  333. (make-forcer buffer-emptier!)))
  334. (define (make-closer closer! buffer-emptier!)
  335. (lambda (port)
  336. (with-new-proposal (lose)
  337. (let ((index (provisional-port-index port)))
  338. (cond ((not (open-output-port? port))
  339. (remove-current-proposal!)
  340. (unspecific))
  341. ((< 0 index)
  342. (buffer-emptier! port #t)
  343. (lose))
  344. (else
  345. (make-output-port-closed! port)
  346. (or (closer! (port-data port))
  347. (lose))))))))
  348. ; First check that PORT is open and then either put BYTE in PORT's buffer or
  349. ; empty the buffer and try again.
  350. (define (make-one-byte-output buffer-emptier!)
  351. (lambda (port byte)
  352. (with-new-proposal (lose)
  353. (let ((index (provisional-port-index port))
  354. (limit (byte-vector-length (port-buffer port))))
  355. (cond ((not (open-output-port? port))
  356. (remove-current-proposal!)
  357. (assertion-violation 'write-byte "invalid argument" port))
  358. ((< index limit)
  359. (provisional-byte-vector-set! (port-buffer port)
  360. index
  361. byte)
  362. (provisional-set-port-index! port (+ 1 index))
  363. (or (maybe-commit)
  364. (lose)))
  365. (else
  366. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  367. (lose)))))))
  368. (define (make-one-char-output buffer-emptier!)
  369. (lambda (port ch)
  370. (let ((encode
  371. (text-codec-encode-char-proc (port-text-codec port))))
  372. (with-new-proposal (lose)
  373. (let ((index (provisional-port-index port))
  374. (limit (byte-vector-length (port-buffer port))))
  375. (cond ((not (open-output-port? port))
  376. (remove-current-proposal!)
  377. (assertion-violation 'write-byte "invalid argument" port))
  378. ((< index limit)
  379. (let ((encode-count #f)
  380. (ok? #f))
  381. (cond
  382. ((not
  383. (maybe-commit-no-interrupts
  384. (lambda ()
  385. (if (and (port-crlf? port)
  386. (char=? ch #\newline))
  387. ;; CR/LF handling ruins our day once again
  388. (call-with-values
  389. (lambda ()
  390. (encode cr
  391. (port-buffer port)
  392. index (- limit index)))
  393. (lambda (the-ok? cr-encode-count)
  394. (cond
  395. ((or (not the-ok?)
  396. (>= (+ index cr-encode-count) limit))
  397. (set! ok? #f)
  398. (set! encode-count (+ 1 cr-encode-count))) ; LF will take at least one
  399. (else
  400. (call-with-values
  401. (lambda ()
  402. (encode #\newline
  403. (port-buffer port)
  404. (+ index cr-encode-count)
  405. (- limit (+ index cr-encode-count))))
  406. (lambda (the-ok? lf-encode-count)
  407. (set! ok? the-ok?)
  408. (if the-ok?
  409. (set-port-index! port
  410. (+ index
  411. cr-encode-count lf-encode-count))
  412. (set! encode-count (+ cr-encode-count lf-encode-count)))))))))
  413. (call-with-values
  414. (lambda ()
  415. (encode ch
  416. (port-buffer port)
  417. index (- limit index)))
  418. (lambda (the-ok? the-encode-count)
  419. (set! ok? the-ok?)
  420. (if the-ok?
  421. (set-port-index! port (+ index the-encode-count))
  422. (set! encode-count the-encode-count))))))))
  423. (lose))
  424. (ok?) ; we're done
  425. (encode-count ; need more space
  426. (with-new-proposal (_)
  427. (call-to-flush! port (lambda () (buffer-emptier! port #t))))
  428. (lose))
  429. (else ; encoding error
  430. (set! ch #\?) ; if we get an encoding error on
  431. ; the second go, we're toast
  432. (lose)))))
  433. (else
  434. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  435. (lose))))))))
  436. ; We have the following possibilities:
  437. ; - the port is no longer open
  438. ; -> raise an error
  439. ; - there is nothing to write
  440. ; -> do nothing
  441. ; - there is room left in the port's buffer
  442. ; -> copy bytes into it
  443. ; - there is no room left in the port's buffer
  444. ; -> write it out and try again
  445. (define (make-write-block buffer-emptier!)
  446. (lambda (port buffer start count)
  447. (let loop ((sent 0))
  448. (with-new-proposal (lose)
  449. (cond ((not (open-output-port? port))
  450. (remove-current-proposal!)
  451. (assertion-violation 'write-block "invalid argument"
  452. buffer start count port))
  453. ((= count 0)
  454. (if (maybe-commit)
  455. 0
  456. (lose)))
  457. ((copy-bytes-out! buffer
  458. (+ start sent)
  459. (- count sent)
  460. port)
  461. => (lambda (more)
  462. (if (maybe-commit)
  463. (let ((sent (+ sent more)))
  464. (if (< sent count)
  465. (loop sent)))
  466. (lose))))
  467. (else
  468. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  469. (lose)))))))
  470. (define (copy-bytes-out! buffer start count port)
  471. (let ((index (provisional-port-index port))
  472. (limit (byte-vector-length (port-buffer port))))
  473. (if (< index limit)
  474. (let ((copy-count (min (- limit index)
  475. count)))
  476. (check-buffer-timestamp! port) ; makes the proposal check this
  477. (provisional-set-port-index! port (+ index copy-count))
  478. (attempt-copy-bytes! buffer start
  479. (port-buffer port) index
  480. copy-count)
  481. copy-count)
  482. #f)))
  483. ; Write out anything in the buffer. When called by the auto-forcing code
  484. ; this may run across the occasional closed port.
  485. ;
  486. ; This loops by calling LOSE if the buffer-emptier's commit fails (in which
  487. ; case the emptier returns false) or if we are trying to empty the entire
  488. ; buffer (indicated by NECESSARY? being true).
  489. (define (make-forcer buffer-emptier!)
  490. (lambda (port necessary?)
  491. (with-new-proposal (lose)
  492. (cond ((not (open-output-port? port))
  493. (if necessary?
  494. (begin
  495. (remove-current-proposal!)
  496. (assertion-violation 'force-output "invalid argument" port)))
  497. (unspecific))
  498. ((< 0 (provisional-port-index port))
  499. (if (or (not (call-to-flush port (lambda () (buffer-emptier! port necessary?))))
  500. necessary?)
  501. (lose)))))))
  502. ;----------------
  503. (define (default-buffer-size)
  504. (channel-parameter (enum channel-parameter-option buffer-size)))
  505. ;----------------
  506. ; Code to periodically flush output ports.
  507. (define flush-these-ports
  508. (make-session-data-slot! (list #f)))
  509. (define (periodically-force-output! port)
  510. (let ((pair (session-data-ref flush-these-ports)))
  511. (set-cdr! pair
  512. (cons (make-weak-pointer port)
  513. (cdr pair)))))
  514. ; Return a list of thunks that will flush the buffer of each open port
  515. ; that contains bytes that have been there since the last time
  516. ; this was called. The actual i/o is done using separate threads to
  517. ; keep i/o errors from killing anything vital.
  518. ;
  519. ; If USE-FLUSHED-FLAGS? is true this won't flush buffers that have been
  520. ; flushed by someone else since the last call. If it is false then flush
  521. ; all non-empty buffers, because the system has nothing to do and is going
  522. ; to pause while waiting for external events.
  523. (define (output-port-forcers use-flushed-flags?)
  524. (let ((pair (session-data-ref flush-these-ports)))
  525. (let loop ((next (cdr pair))
  526. (last pair)
  527. (thunks '()))
  528. (if (null? next)
  529. ; (begin (debug-message "[forcing "
  530. ; (length thunks)
  531. ; " thunk(s)]")
  532. thunks ;)
  533. (let ((port (weak-pointer-ref (car next))))
  534. (cond ((or (not port) ; GCed or closed so
  535. (not (open-output-port? port))) ; drop it from the list
  536. (set-cdr! last (cdr next))
  537. (loop (cdr next) last thunks))
  538. ((eq? (port-flushed port) 'flushing) ; somebody else is doing it
  539. (loop (cdr next) next thunks))
  540. ((and use-flushed-flags? ; flushed recently
  541. (port-flushed port))
  542. (set-port-flushed! port #f) ; race condition, but harmless
  543. (loop (cdr next) next thunks))
  544. ((< 0 (port-index port)) ; non-empty
  545. (loop (cdr next) next
  546. (cons (make-forcing-thunk port)
  547. thunks)))
  548. (else ; empty
  549. (loop (cdr next) next thunks))))))))
  550. ; Returns a list of the current ports that are flushed whenever.
  551. ; This is used to flush channel ports before forking.
  552. (define (periodically-flushed-ports)
  553. (let* ((ints (set-enabled-interrupts! 0))
  554. (pair (session-data-ref flush-these-ports)))
  555. (let loop ((next (cdr pair))
  556. (last pair)
  557. (ports '()))
  558. (if (null? next)
  559. (begin
  560. (set-enabled-interrupts! ints)
  561. ports)
  562. (let ((port (weak-pointer-ref (car next))))
  563. (cond ((or (not port) ; GCed or closed
  564. (not (open-output-port? port))) ; so drop it from the list
  565. (set-cdr! last (cdr next))
  566. (loop (cdr next) last ports))
  567. (else
  568. (loop (cdr next)
  569. next
  570. (cons port ports)))))))))
  571. ; Write out PORT's buffer. If a problem occurs it is reported and PORT
  572. ; is closed.
  573. (define (make-forcing-thunk port)
  574. (lambda ()
  575. ; (debug-message "[forcing port]")
  576. (if (and (report-errors-as-warnings
  577. (lambda ()
  578. (force-output-if-open port))
  579. "error when flushing buffer; closing port"
  580. port)
  581. (open-output-port? port))
  582. (report-errors-as-warnings
  583. (lambda ()
  584. (atomically! (set-port-index! port 0)) ; prevent flushing
  585. ((port-handler-close (port-handler port))
  586. port))
  587. "error when closing port"
  588. port))))
  589. (define (call-to-flush! port thunk)
  590. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  591. (thunk)
  592. (set-port-flushed! port #t))
  593. (define (call-to-flush port thunk)
  594. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  595. (let ((retval (thunk))) ; one is enough
  596. (set-port-flushed! port #t)
  597. retval))