port-buffer.scm 19 KB


  1. ; Copyright (c) 1993-2008 by Richard Kelsey and Jonathan Rees. See file COPYING.
  2. (define (make-buffered-input-port handler data buffer index limit)
  3. (if (and (okay-buffer? buffer index limit)
  4. (port-handler? handler))
  5. (make-port handler
  6. (enum text-encoding-option latin-1)
  7. #f
  8. (bitwise-ior input-port-mask open-input-port-mask)
  9. #f ; timestamp (was lock)
  10. data
  11. buffer
  12. index
  13. limit
  14. #f ; pending-cr?
  15. #f) ; pending-eof?
  16. (call-error "invalid argument"
  17. make-buffered-input-port handler data buffer index limit)))
  18. (define (make-buffered-output-port handler data buffer index limit)
  19. (if (and (okay-buffer? buffer index limit)
  20. (> limit 0)
  21. (port-handler? handler))
  22. (make-port handler
  23. (enum text-encoding-option latin-1)
  24. #f
  25. open-output-port-status
  26. #f ; timestamp (was lock)
  27. data
  28. buffer
  29. index
  30. limit
  31. #f ; pending-cr?
  32. #f) ; pending-eof?
  33. (call-error "invalid argument"
  34. make-buffered-output-port handler data buffer index limit)))
  35. (define (okay-buffer? buffer index limit)
  36. (and (byte-vector? buffer)
  37. (integer? limit)
  38. (integer? index)
  39. (exact? limit)
  40. (exact? index)
  41. (<= 0 limit (byte-vector-length buffer))
  42. (<= 0 index limit)))
  43. ;----------------
  44. ; (buffered-input-port-handler discloser
  45. ; closer
  46. ; fill-buffer-proc) -> handler
  47. ;
  48. ; (fill-buffer-proc <port> <wait?>)
  49. ; -> <boolean> ; true if commit works, false if it fails
  50. ; Closer must also do a maybe-commit and return the result.
  51. ;
  52. ; If <wait?> is true then wait for input. If <wait?> is false then return
  53. ; immediately even if no input is available.
  54. (define (make-buffered-input-port-handler discloser
  55. closer!
  56. buffer-filler!
  57. ready?)
  58. (make-port-handler (lambda (port)
  59. (discloser (port-data port)))
  60. (lambda (port)
  61. (with-new-proposal (lose)
  62. (make-input-port-closed! port)
  63. (or (closer! (port-data port))
  64. (lose))))
  65. (make-one-byte-input buffer-filler!)
  66. (make-one-char-input buffer-filler!)
  67. (make-read-block buffer-filler!)
  68. (make-byte-ready? ready? #t)
  69. #f)) ; force
  70. ;----------------
  71. ; Rename an old field
  72. (define (note-buffer-reuse! port)
  73. (provisional-set-port-lock! port (cons 'timestamp '())))
  74. ; Calling this has the side-effect of getting the current proposal to check
  75. ; the timestamp value when committing.
  76. (define check-buffer-timestamp! provisional-port-lock)
  77. ; And a current field.
  78. (define port-flushed port-pending-eof?)
  79. (define set-port-flushed! set-port-pending-eof?!)
  80. ;----------------
  81. ; The READ? argument says whether we're doing a READ or a PEEK.
  82. (define (make-one-byte-input buffer-filler!)
  83. (lambda (port read?)
  84. (with-new-proposal (lose)
  85. (let ((index (provisional-port-index port))
  86. (limit (provisional-port-limit port)))
  87. (cond ((not (open-input-port? port))
  88. (remove-current-proposal!)
  89. (call-error "invalid argument"
  90. (if read? read-byte peek-byte)
  91. port))
  92. ((< index limit)
  93. (if read?
  94. (provisional-set-port-index! port (+ 1 index)))
  95. (let ((b (provisional-byte-vector-ref (port-buffer port)
  96. index)))
  97. (if (maybe-commit)
  98. b
  99. (lose))))
  100. ((provisional-port-pending-eof? port)
  101. (if read?
  102. (provisional-set-port-pending-eof?! port #f))
  103. (if (maybe-commit)
  104. (eof-object)
  105. (lose)))
  106. (else
  107. (provisional-set-port-index! port 0)
  108. (provisional-set-port-limit! port 0)
  109. (buffer-filler! port #t)
  110. (lose)))))))
  111. ; The MODE argument says whether we're doing a READ (#f) , a PEEK (#t),
  112. ; or a CHAR-READY? ( () )
  113. (define (make-one-char-input buffer-filler!)
  114. (lambda (port mode)
  115. (let ((decode
  116. (text-codec-decode-char-proc (port-text-codec port))))
  117. (with-new-proposal (lose)
  118. (let ((limit (provisional-port-limit port)))
  119. (let loop ((index (provisional-port-index port)))
  120. (define (consume&deliver decode-count val)
  121. (if (not mode)
  122. (provisional-set-port-index! port
  123. (+ index decode-count)))
  124. (if (maybe-commit)
  125. val
  126. (lose)))
  127. (cond ((not (open-input-port? port))
  128. (remove-current-proposal!)
  129. (call-error "invalid argument"
  130. (cond
  131. ((not mode) read-char)
  132. ((null? mode) char-ready?)
  133. (else peek-char))
  134. port))
  135. ((< index limit)
  136. (let ((buffer (port-buffer port)))
  137. (call-with-values
  138. (lambda ()
  139. (decode buffer index (- limit index)))
  140. (lambda (ch decode-count)
  141. (cond
  142. (ch
  143. ;; CR/LF handling. Great.
  144. (cond
  145. ((port-crlf? port)
  146. (cond
  147. ((char=? ch cr)
  148. (provisional-set-port-pending-cr?! port #t)
  149. (consume&deliver decode-count
  150. (if (null? mode) ; CHAR-READY?
  151. #t
  152. #\newline)))
  153. ((and (char=? ch #\newline)
  154. (provisional-port-pending-cr? port))
  155. (provisional-set-port-pending-cr?! port #f)
  156. (loop (+ index decode-count)))
  157. (else
  158. (provisional-set-port-pending-cr?! port #f)
  159. (consume&deliver decode-count
  160. (if (null? mode) ; CHAR-READY?
  161. #t
  162. ch)))))
  163. (else
  164. (provisional-set-port-pending-cr?! port #f)
  165. (consume&deliver decode-count
  166. (if (null? mode) ; CHAR-READY?
  167. #t
  168. ch)))))
  169. ((or (not decode-count) ; decoding error
  170. (provisional-port-pending-eof? port)) ; partial char
  171. (consume&deliver 1
  172. (if (null? mode)
  173. #t
  174. #\?)))
  175. ;; need at least DECODE-COUNT bytes
  176. (else
  177. (if (> decode-count
  178. (- (byte-vector-length buffer)
  179. limit))
  180. ;; copy what we have to the
  181. ;; beginning so there's space at the
  182. ;; end we can try to fill
  183. (begin
  184. ;; (debug-message "aligning port buffer")
  185. (attempt-copy-bytes! buffer index
  186. buffer 0
  187. (- limit index))
  188. (provisional-set-port-index! port 0)
  189. (provisional-set-port-limit! port (- limit index))))
  190. (if (or (not (buffer-filler! port (not (null? mode))))
  191. (not (null? mode)))
  192. (lose)
  193. #f)))))))
  194. ((provisional-port-pending-eof? port)
  195. (if (not mode)
  196. (provisional-set-port-pending-eof?! port #f))
  197. (cond
  198. ((not (maybe-commit))
  199. (lose))
  200. ((null? mode) #t)
  201. (else (eof-object))))
  202. (else
  203. (if (= index limit) ; we have zilch
  204. (begin
  205. (provisional-set-port-index! port 0)
  206. (provisional-set-port-limit! port 0))
  207. ;; may be out of synch because of CR/LF conversion
  208. (provisional-set-port-index! port index))
  209. (if (or (not (buffer-filler! port (not (null? mode))))
  210. (not (null? mode)))
  211. (lose)
  212. #f)))))))))
  213. ;----------------
  214. ; See if there is a byte available.
  215. (define (make-byte-ready? ready? read?)
  216. (lambda (port)
  217. (with-new-proposal (lose)
  218. (cond ((not ((if read?
  219. open-input-port?
  220. open-output-port?)
  221. port))
  222. (remove-current-proposal!)
  223. (call-error "invalid argument" byte-ready? port))
  224. ((or (< (provisional-port-index port)
  225. (provisional-port-limit port))
  226. (and read?
  227. (provisional-port-pending-eof? port)))
  228. (if (maybe-commit)
  229. #t
  230. (lose)))
  231. (else
  232. (call-with-values
  233. (lambda ()
  234. (ready? port))
  235. (lambda (okay? ready?)
  236. (if okay?
  237. ready?
  238. (lose)))))))))
  239. ;----------------
  240. ; Block input
  241. ;
  242. ; If EOF-OKAY? is true the caller will pass an EOF back to the user. If it's
  243. ; false then the caller already has a value to pass back and we have to preserve
  244. ; an EOF for the next invocation.
  245. (define (make-read-block buffer-filler!)
  246. (lambda (port buffer start count wait?)
  247. (let loop ((have 0) (first? #t))
  248. (with-new-proposal (lose)
  249. (if (open-input-port? port)
  250. (let ((result (cond ((provisional-port-pending-eof? port)
  251. (if (= have 0)
  252. (provisional-set-port-pending-eof?! port #f))
  253. (eof-object))
  254. ((= count 0)
  255. 0)
  256. (else
  257. (get-available-bytes! buffer
  258. (+ start have)
  259. (- count have)
  260. port)))))
  261. (cond ((not result)
  262. (if (or wait? first?)
  263. (if (buffer-filler! port wait?)
  264. (loop have #f)
  265. (lose))
  266. (if (maybe-commit)
  267. 0
  268. (lose))))
  269. ((not (maybe-commit))
  270. (lose))
  271. ((eof-object? result)
  272. (if (= have 0)
  273. result
  274. have))
  275. (else
  276. (let ((have (+ have result)))
  277. (if (< have count)
  278. (loop have #f)
  279. have)))))
  280. (begin
  281. (remove-current-proposal!)
  282. (call-error "invalid argument" read-block port buffer start count)))))))
  283. ; Copy whatever bytes are currently available.
  284. ;
  285. ; Reading the timestamp makes its value part of the current proposal. The
  286. ; timestamp is set whenever the buffer is refilled. Without it the proposal
  287. ; could be fooled if the buffer were refilled and the index and limit just
  288. ; happened to be reset to their current values.
  289. (define (get-available-bytes! buffer start count port)
  290. (let* ((index (provisional-port-index port))
  291. (have (- (provisional-port-limit port)
  292. index)))
  293. (if (< 0 have)
  294. (let ((copy-count (min have count)))
  295. (check-buffer-timestamp! port) ; makes the proposal check this
  296. (attempt-copy-bytes! (port-buffer port)
  297. index
  298. buffer
  299. start
  300. copy-count)
  301. (provisional-set-port-index! port
  302. (+ index copy-count))
  303. copy-count)
  304. (begin
  305. (provisional-set-port-index! port 0)
  306. (provisional-set-port-limit! port 0)
  307. #f))))
  308. ;----------------------------------------------------------------
  309. ; Buffered output ports
  310. ;
  311. ; (buffered-output-port-handler discloser
  312. ; closer
  313. ; empty-buffer-proc) -> handler
  314. ;
  315. ; (empty-buffer-proc <port>) -> whatever
  316. ;
  317. ; The buffer emptier must call maybe-commit.
  318. (define (make-buffered-output-port-handler discloser
  319. closer!
  320. buffer-emptier!
  321. ready?)
  322. (make-port-handler (lambda (port)
  323. (discloser (port-data port)))
  324. (make-closer closer! buffer-emptier!)
  325. (make-one-byte-output buffer-emptier!)
  326. (make-one-char-output buffer-emptier!)
  327. (make-write-block buffer-emptier!)
  328. (make-byte-ready? ready? #f)
  329. (make-forcer buffer-emptier!)))
  330. (define (make-closer closer! buffer-emptier!)
  331. (lambda (port)
  332. (with-new-proposal (lose)
  333. (let ((index (provisional-port-index port)))
  334. (cond ((not (open-output-port? port))
  335. (remove-current-proposal!)
  336. (unspecific))
  337. ((< 0 index)
  338. (buffer-emptier! port #t)
  339. (lose))
  340. (else
  341. (make-output-port-closed! port)
  342. (or (closer! (port-data port))
  343. (lose))))))))
  344. ; First check that PORT is open and then either put BYTE in PORT's buffer or
  345. ; empty the buffer and try again.
  346. (define (make-one-byte-output buffer-emptier!)
  347. (lambda (port byte)
  348. (with-new-proposal (lose)
  349. (let ((index (provisional-port-index port))
  350. (limit (byte-vector-length (port-buffer port))))
  351. (cond ((not (open-output-port? port))
  352. (remove-current-proposal!)
  353. (call-error "invalid argument" write-byte port))
  354. ((< index limit)
  355. (provisional-byte-vector-set! (port-buffer port)
  356. index
  357. byte)
  358. (provisional-set-port-index! port (+ 1 index))
  359. (or (maybe-commit)
  360. (lose)))
  361. (else
  362. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  363. (lose)))))))
  364. (define (make-one-char-output buffer-emptier!)
  365. (lambda (port ch)
  366. (let ((encode
  367. (text-codec-encode-char-proc (port-text-codec port))))
  368. (with-new-proposal (lose)
  369. (let ((index (provisional-port-index port))
  370. (limit (byte-vector-length (port-buffer port))))
  371. (cond ((not (open-output-port? port))
  372. (remove-current-proposal!)
  373. (call-error "invalid argument" write-byte port))
  374. ((< index limit)
  375. (let ((encode-count #f)
  376. (ok? #f))
  377. (cond
  378. ((not
  379. (maybe-commit-no-interrupts
  380. (lambda ()
  381. (if (and (port-crlf? port)
  382. (char=? ch #\newline))
  383. ;; CR/LF handling ruins our day once again
  384. (call-with-values
  385. (lambda ()
  386. (encode cr
  387. (port-buffer port)
  388. index (- limit index)))
  389. (lambda (the-ok? cr-encode-count)
  390. (cond
  391. ((or (not the-ok?)
  392. (>= (+ index cr-encode-count) limit))
  393. (set! ok? #f)
  394. (set! encode-count (+ 1 cr-encode-count))) ; LF will take at least one
  395. (else
  396. (call-with-values
  397. (lambda ()
  398. (encode #\newline
  399. (port-buffer port)
  400. (+ index cr-encode-count)
  401. (- limit (+ index cr-encode-count))))
  402. (lambda (the-ok? lf-encode-count)
  403. (set! ok? the-ok?)
  404. (if the-ok?
  405. (set-port-index! port
  406. (+ index
  407. cr-encode-count lf-encode-count))
  408. (set! encode-count (+ cr-encode-count lf-encode-count)))))))))
  409. (call-with-values
  410. (lambda ()
  411. (encode ch
  412. (port-buffer port)
  413. index (- limit index)))
  414. (lambda (the-ok? the-encode-count)
  415. (set! ok? the-ok?)
  416. (if the-ok?
  417. (set-port-index! port (+ index the-encode-count))
  418. (set! encode-count the-encode-count))))))))
  419. (lose))
  420. (ok?) ; we're done
  421. (encode-count ; need more space
  422. (with-new-proposal (_)
  423. (call-to-flush! port (lambda () (buffer-emptier! port #t))))
  424. (lose))
  425. (else ; encoding error
  426. (set! ch #\?) ; if we get an encoding error on
  427. ; the second go, we're toast
  428. (lose)))))
  429. (else
  430. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  431. (lose))))))))
  432. ; We have the following possibilities:
  433. ; - the port is no longer open
  434. ; -> raise an error
  435. ; - there is nothing to write
  436. ; -> do nothing
  437. ; - there is room left in the port's buffer
  438. ; -> copy bytes into it
  439. ; - there is no room left in the port's buffer
  440. ; -> write it out and try again
  441. (define (make-write-block buffer-emptier!)
  442. (lambda (port buffer start count)
  443. (let loop ((sent 0))
  444. (with-new-proposal (lose)
  445. (cond ((not (open-output-port? port))
  446. (remove-current-proposal!)
  447. (call-error "invalid argument"
  448. write-block buffer start count port))
  449. ((= count 0)
  450. (if (maybe-commit)
  451. 0
  452. (lose)))
  453. ((copy-bytes-out! buffer
  454. (+ start sent)
  455. (- count sent)
  456. port)
  457. => (lambda (more)
  458. (if (maybe-commit)
  459. (let ((sent (+ sent more)))
  460. (if (< sent count)
  461. (loop sent)))
  462. (lose))))
  463. (else
  464. (call-to-flush! port (lambda () (buffer-emptier! port #t)))
  465. (lose)))))))
  466. (define (copy-bytes-out! buffer start count port)
  467. (let ((index (provisional-port-index port))
  468. (limit (byte-vector-length (port-buffer port))))
  469. (if (< index limit)
  470. (let ((copy-count (min (- limit index)
  471. count)))
  472. (check-buffer-timestamp! port) ; makes the proposal check this
  473. (provisional-set-port-index! port (+ index copy-count))
  474. (attempt-copy-bytes! buffer start
  475. (port-buffer port) index
  476. copy-count)
  477. copy-count)
  478. #f)))
  479. ; Write out anything in the buffer. When called by the auto-forcing code
  480. ; this may run across the occasional closed port.
  481. ;
  482. ; This loops by calling LOSE if the buffer-emptier's commit fails (in which
  483. ; case the emptier returns false) or if we are trying to empty the entire
  484. ; buffer (indicated by NECESSARY? being true).
  485. (define (make-forcer buffer-emptier!)
  486. (lambda (port necessary?)
  487. (with-new-proposal (lose)
  488. (cond ((not (open-output-port? port))
  489. (if necessary?
  490. (begin
  491. (remove-current-proposal!)
  492. (call-error "invalid argument" force-output port)))
  493. (unspecific))
  494. ((< 0 (provisional-port-index port))
  495. (if (or (not (call-to-flush port (lambda () (buffer-emptier! port necessary?))))
  496. necessary?)
  497. (lose)))))))
  498. ;----------------
  499. (define (default-buffer-size)
  500. (channel-parameter (enum channel-parameter-option buffer-size)))
  501. ;----------------
  502. ; Code to periodically flush output ports.
  503. (define flush-these-ports
  504. (make-session-data-slot! (list #f)))
  505. (define (periodically-force-output! port)
  506. (let ((pair (session-data-ref flush-these-ports)))
  507. (set-cdr! pair
  508. (cons (make-weak-pointer port)
  509. (cdr pair)))))
  510. ; Return a list of thunks that will flush the buffer of each open port
  511. ; that contains bytes that have been there since the last time
  512. ; this was called. The actual i/o is done using separate threads to
  513. ; keep i/o errors from killing anything vital.
  514. ;
  515. ; If USE-FLUSHED-FLAGS? is true this won't flush buffers that have been
  516. ; flushed by someone else since the last call. If it is false then flush
  517. ; all non-empty buffers, because the system has nothing to do and is going
  518. ; to pause while waiting for external events.
  519. (define (output-port-forcers use-flushed-flags?)
  520. (let ((pair (session-data-ref flush-these-ports)))
  521. (let loop ((next (cdr pair))
  522. (last pair)
  523. (thunks '()))
  524. (if (null? next)
  525. ; (begin (debug-message "[forcing "
  526. ; (length thunks)
  527. ; " thunk(s)]")
  528. thunks ;)
  529. (let ((port (weak-pointer-ref (car next))))
  530. (cond ((or (not port) ; GCed or closed so
  531. (not (open-output-port? port))) ; drop it from the list
  532. (set-cdr! last (cdr next))
  533. (loop (cdr next) last thunks))
  534. ((eq? (port-flushed port) 'flushing) ; somebody else is doing it
  535. (loop (cdr next) next thunks))
  536. ((and use-flushed-flags? ; flushed recently
  537. (port-flushed port))
  538. (set-port-flushed! port #f) ; race condition, but harmless
  539. (loop (cdr next) next thunks))
  540. ((< 0 (port-index port)) ; non-empty
  541. (loop (cdr next) next
  542. (cons (make-forcing-thunk port)
  543. thunks)))
  544. (else ; empty
  545. (loop (cdr next) next thunks))))))))
  546. ; Returns a list of the current ports that are flushed whenever.
  547. ; This is used to flush channel ports before forking.
  548. (define (periodically-flushed-ports)
  549. (let* ((ints (set-enabled-interrupts! 0))
  550. (pair (session-data-ref flush-these-ports)))
  551. (let loop ((next (cdr pair))
  552. (last pair)
  553. (ports '()))
  554. (if (null? next)
  555. (begin
  556. (set-enabled-interrupts! ints)
  557. ports)
  558. (let ((port (weak-pointer-ref (car next))))
  559. (cond ((or (not port) ; GCed or closed
  560. (not (open-output-port? port))) ; so drop it from the list
  561. (set-cdr! last (cdr next))
  562. (loop (cdr next) last ports))
  563. (else
  564. (loop (cdr next)
  565. next
  566. (cons port ports)))))))))
  567. ; Write out PORT's buffer. If a problem occurs it is reported and PORT
  568. ; is closed.
  569. (define (make-forcing-thunk port)
  570. (lambda ()
  571. ; (debug-message "[forcing port]")
  572. (if (and (report-errors-as-warnings
  573. (lambda ()
  574. (force-output-if-open port))
  575. "error when flushing buffer; closing port"
  576. port)
  577. (open-output-port? port))
  578. (report-errors-as-warnings
  579. (lambda ()
  580. (atomically! (set-port-index! port 0)) ; prevent flushing
  581. ((port-handler-close (port-handler port))
  582. port))
  583. "error when closing port"
  584. port))))
  585. (define (call-to-flush! port thunk)
  586. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  587. (thunk)
  588. (set-port-flushed! port #t))
  589. (define (call-to-flush port thunk)
  590. (set-port-flushed! port 'flushing) ; don't let the periodic flusher go crazy
  591. (let ((retval (thunk))) ; one is enough
  592. (set-port-flushed! port #t)
  593. retval))