mq-stream.scm 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. ;; This file is part of scheme-GNUnet.
  2. ;; Copyright (C) 2021 GNUnet e.V.
  3. ;;
  4. ;; scheme-GNUnet is free software: you can redistribute it and/or modify it
  5. ;; under the terms of the GNU Affero General Public License as published
  6. ;; by the Free Software Foundation, either version 3 of the License,
  7. ;; or (at your option) any later version.
  8. ;;
  9. ;; scheme-GNUnet is distributed in the hope that it will be useful, but
  10. ;; WITHOUT ANY WARRANTY; without even the implied warranty of
  11. ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. ;; Affero General Public License for more details.
  13. ;;
  14. ;; You should have received a copy of the GNU Affero General Public License
  15. ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. ;;
  17. ;; SPDX-License-Identifier: AGPL-3.0-or-later
  18. (use-modules (gnu gnunet mq-impl stream)
  19. (gnu gnunet mq)
  20. (gnu gnunet mq handler)
  21. (gnu gnunet utils hat-let)
  22. (gnu gnunet utils bv-slice)
  23. (gnu gnunet config db)
  24. (gnu gnunet concurrency repeated-condition)
  25. (fibers conditions)
  26. (fibers operations)
  27. (fibers)
  28. ((rnrs arithmetic bitwise) #:select (bitwise-ior))
  29. (rnrs bytevectors)
  30. ((rnrs io ports) #:select (open-bytevector-input-port))
  31. ((rnrs base) #:select (assert))
  32. (rnrs hashtables)
  33. ((rnrs exceptions) #:select (guard))
  34. (srfi srfi-26)
  35. (srfi srfi-43)
  36. (rnrs io ports)
  37. (ice-9 atomic)
  38. (ice-9 binary-ports)
  39. (ice-9 suspendable-ports)
  40. (ice-9 control)
  41. (ice-9 match)
  42. (ice-9 threads)
  43. (tests utils))
  44. (define (no-sender . _)
  45. (error "no sender!"))
  46. (define no-handlers (message-handlers))
  47. (define (no-error-handler . _)
  48. (error "no error handler!"))
  49. (test-begin "mq-stream")
  50. (define (check-slice-equal slice bv)
  51. (let^ ((!! (assert (= (slice-length slice)
  52. (bytevector-length bv))))
  53. (! slice-copy (make-bytevector (slice-length slice)))
  54. (! copy (bv-slice/read-write slice-copy))
  55. (<-- () (slice-copy! slice copy))
  56. (!! (bytevector=? slice-copy bv)))
  57. (values)))
  58. ;; Without interposition, and the verifier always
  59. ;; returns #t.
  60. (define (simple-handler the-type handle)
  61. (message-handler
  62. (type the-type)
  63. ((interpose code) code)
  64. ((well-formed? _) #t)
  65. ((handle! x) (handle x))))
  66. ;; Why isn't this the default? This stops the process from
  67. ;; exiting instead of raising an EPIPE system-error when
  68. ;; writing to a broken pipe.
  69. (sigaction SIGPIPE SIG_IGN)
  70. (define (handle-input*! mq input)
  71. (call-with-values (lambda () (handle-input! mq input))
  72. (lambda e
  73. (apply inject-error! mq e)
  74. (values))))
  75. (test-assert "messages + eof are injected in-order"
  76. (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
  77. 0 5 0 2 1 ; Message type 2, size 6
  78. 0 6 0 3 2 1)) ; Message type 3, size 7
  79. (! input (open-bytevector-input-port input/bv))
  80. (! received 0)
  81. (! (make-handler type expected-received expected-bv)
  82. (simple-handler
  83. type
  84. (lambda (slice)
  85. (assert (equal? received expected-received))
  86. (check-slice-equal slice expected-bv)
  87. (set! received (+ 1 received)))))
  88. (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
  89. (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
  90. (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
  91. (! handlers
  92. (message-handlers handler/1 handler/2 handler/3))
  93. (! (error-handler . arguments)
  94. (assert (equal? received 3))
  95. (assert (equal? arguments '(input:regular-end-of-file)))
  96. (set! received 'end-of-file))
  97. (! mq (make-message-queue handlers error-handler no-sender))
  98. (<-- () (handle-input*! mq input)))
  99. ;; TODO: should the port be closed?
  100. (assert (equal? received 'end-of-file))))
  101. (test-assert "overly small message is detected (--> stop)"
  102. (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
  103. 0 3 9 ; Overly small message, size 3, type != 0
  104. 0 4 0 1)) ; Message type 1, size 4
  105. ;; The first message is well-formatted and should therefore
  106. ;; be injected. The second one isn't, so an appropriate error should
  107. ;; injected. Then the message stream is broken, so the third
  108. ;; message shouldn't be injected.
  109. (! input (open-bytevector-input-port input/bv))
  110. (! received 0)
  111. (! handler/0
  112. (simple-handler 0
  113. (lambda (slice)
  114. (assert (equal? received 0))
  115. (check-slice-equal slice #vu8(0 4 0 0))
  116. (set! received 1))))
  117. (! handlers
  118. (message-handlers handler/0))
  119. (! (error-handler . arguments)
  120. (assert (equal? received 1))
  121. ;; Whether this malformed even has a message type is dubious,
  122. ;; but if it has one, it will be (* 256 9).
  123. (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
  124. (set! received 'overly-small))
  125. (! mq (make-message-queue handlers error-handler no-sender))
  126. (<-- () (handle-input*! mq input)))
  127. (assert (equal? received 'overly-small))))
  128. (test-assert "premature eof is detected (--> stop)"
  129. (let^ ((! input/bv #vu8(0 8 7 6 5 4))
  130. (! input (open-bytevector-input-port input/bv))
  131. (! received #f)
  132. (! (error-handler . arguments)
  133. (assert (eq? received #f))
  134. (assert (equal? arguments '(input:premature-end-of-file)))
  135. (set! received #t))
  136. (! mq (make-message-queue no-handlers error-handler no-sender))
  137. (<-- () (handle-input*! mq input)))
  138. (assert (equal? received #t))))
  139. (test-equal "envelopes are written (no blocking)"
  140. ;; Three messages
  141. #vu8(0 4 0 1
  142. 0 4 0 2
  143. 0 4 0 3)
  144. (let^ ((! messages #(#vu8(0 4 0 1)
  145. #vu8(0 4 0 2)
  146. #vu8(0 4 0 3)))
  147. (<-- (port get-bytevector) (open-bytevector-output-port))
  148. (! mq (make-message-queue no-handlers no-error-handler
  149. (lambda (_) (values))))
  150. (! (insert-message index message)
  151. (send-message! mq (slice/read-only (bv-slice/read-write message))))
  152. (<-- ()
  153. (begin
  154. (vector-for-each insert-message messages)
  155. (values)))
  156. (<-- ()
  157. ;; The implementation detail that 'send-round'
  158. ;; is called before 'wait!' is assumed here.
  159. (let/ec ec
  160. (handle-output! mq port ec)
  161. (error "unreachable"))))
  162. (get-bytevector)))
  163. (define (blocking-output-port port . block-positions)
  164. (define (close)
  165. (close-port port))
  166. (define (write! bv index length)
  167. (define p (port-position port))
  168. (if (or (null? block-positions)
  169. (< (+ p length) (car block-positions)))
  170. (begin (put-bytevector port bv index length) length)
  171. (let ((short (- (car block-positions) p)))
  172. (put-bytevector port bv index short)
  173. ((current-write-waiter) port/blocking)
  174. (set! block-positions (cdr block-positions))
  175. short)))
  176. (define port/blocking
  177. (make-custom-binary-output-port "" write! #f #f close))
  178. (setvbuf port/blocking 'none)
  179. port/blocking)
  180. ;; The ‘blocking’ is to make this test case more interesting.
  181. ;; It does not currently have any effect, but it is expected
  182. ;; that the implementation of handle-output! will be changed
  183. ;; to react to blocking, for implementing message queue
  184. ;; shutdown.
  185. (test-equal "repeatable conditions can be used (blocking)"
  186. '(#vu8(0 4 0 1 0 4 0 2) . 4) ; 4: number of times writing blocks
  187. (let^ ((! rcvar (make-repeated-condition))
  188. (! stop? (make-condition))
  189. (! stopped? (make-condition))
  190. (! (interrupt! mq)
  191. (trigger-condition! rcvar))
  192. (! escape/output (make-parameter #f))
  193. (<-- (out/internal get-bytevector)
  194. (open-bytevector-output-port))
  195. ;; block writing a few times
  196. (! out (blocking-output-port out/internal 0 1 3 7))
  197. (! (wait!)
  198. (perform-operation
  199. (apply choice-operation
  200. (prepare-await-trigger! rcvar)
  201. (if (>= 8 (port-position out/internal))
  202. (list (wrap-operation
  203. (wait-operation stop?)
  204. (lambda () ((escape/output)))))
  205. '()))))
  206. (! mq (make-message-queue no-handlers no-error-handler interrupt!))
  207. (! n/blocked 0)
  208. (! message/1 #vu8(0 4 0 1))
  209. (! message/2 #vu8(0 4 0 2)))
  210. (run-fibers
  211. (lambda ()
  212. (spawn-fiber
  213. (lambda ()
  214. (let/ec ec
  215. (parameterize ((escape/output ec)
  216. (current-write-waiter
  217. (lambda (port)
  218. (cond ((eq? port out)
  219. (set! n/blocked (+ n/blocked 1)))
  220. ((file-port? port)
  221. ;; XXX ‘Attempt to suspend fiber within
  222. ;; continuaton barrier’
  223. #;((@@ (fibers) wait-for-writable) port)
  224. (select '() (list port) '()))))))
  225. (handle-output! mq out wait!)))
  226. (signal-condition! stopped?)))
  227. (send-message! mq (bv-slice/read-write message/1))
  228. (sleep 0.001)
  229. (send-message! mq (bv-slice/read-write message/2))
  230. (sleep 0.001)
  231. (signal-condition! stop?)
  232. (wait stopped?)
  233. (cons (get-bytevector) n/blocked))
  234. #:parallelism 1
  235. #:hz 0)))
  236. (define (call-with-temporary-directory proc)
  237. (let ((file (mkdtemp (in-vicinity (or (getenv "TMPDIR") "/tmp")
  238. "test-XXXXXX"))))
  239. (with-exception-handler
  240. (lambda (e)
  241. (system* "rm" "-r" file)
  242. (raise-exception e))
  243. (lambda ()
  244. (call-with-values
  245. (lambda () (proc file))
  246. (lambda the-values
  247. (system* "rm" "-r" file)
  248. (apply values the-values)))))))
  249. (define (make-config where)
  250. (hash->configuration
  251. (alist->hash-table
  252. `((("service" . "UNIXPATH") . ,where)))))
  253. (define (call-with-socket-location proc)
  254. (call-with-temporary-directory
  255. (lambda (dir)
  256. (define where (in-vicinity dir "sock.et"))
  257. (define config (make-config where))
  258. (proc where config))))
  259. (define (connect/test config connected?)
  260. (define (error-handler . error)
  261. (match error
  262. ;; The connection is closed by 'test-connection'.
  263. ;; If 'test-connection' doesn't close the connection,
  264. ;; then the GC would. In both cases, this error would
  265. ;; happen.
  266. (('input:regular-end-of-file) (values))
  267. (('connection:connected) (signal-condition! connected?))))
  268. (connect/fibers config "service" no-handlers error-handler
  269. #:spawn call-with-new-thread))
  270. (define (alist->hash-table alist)
  271. (define h (make-hashtable (lambda (key) 0) equal?))
  272. (define (insert! key+value)
  273. (hashtable-set! h (car key+value) (cdr key+value)))
  274. (for-each insert! alist)
  275. h)
  276. (define (test-connection mq server-sock)
  277. (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
  278. (let ((client (car (accept server-sock))))
  279. (assert (equal? #vu8(0 4 0 0) (get-bytevector-n client 4)))
  280. (close-port client)
  281. #t))
  282. (define (yield-many)
  283. ;; Give the new threads some time to run before binding the socket.
  284. ;; This allowed a bug in the use of 'connect' to be detected.
  285. (let loop ((n (* 8 (+ 1 (length (all-threads))))))
  286. (when (> n 0)
  287. (yield)
  288. (loop (- n 1)))))
  289. (test-assert "connect-unix, can connect when socket is already listening"
  290. (call-with-socket-location
  291. (lambda (where config)
  292. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  293. (define connected? (make-condition))
  294. (bind listening-sock AF_UNIX where)
  295. (listen listening-sock 1)
  296. (define mq (connect/test config connected?))
  297. (wait connected?)
  298. (test-connection mq listening-sock))))
  299. ;; Consider the case where a service starts, has bound its socket
  300. ;; but is not yet listening, and a client connects.
  301. (test-assert "connect-unix, will connect when socket is listening"
  302. (call-with-socket-location
  303. (lambda (where config)
  304. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  305. (define connected? (make-condition))
  306. (bind listening-sock AF_UNIX where)
  307. (define mq (connect/test config connected?))
  308. (yield-many)
  309. (listen listening-sock 1)
  310. (wait connected?)
  311. (test-connection mq listening-sock))))
  312. ;; Consider the case where a client starts before a service.
  313. (test-assert "connect-unix, will connect when socket is bound (and listening)"
  314. (call-with-socket-location
  315. (lambda (where config)
  316. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  317. (define connected? (make-condition))
  318. (define mq (connect/test config connected?))
  319. (yield-many)
  320. (bind listening-sock AF_UNIX where)
  321. (listen listening-sock 1)
  322. (wait connected?)
  323. (test-connection mq listening-sock))))
  324. ;; Consider the case where a service starts and stops,
  325. ;; a client connects and the service restarts.
  326. (test-assert
  327. "connect-unix, will connect even if there's an old socket lying around"
  328. (call-with-socket-location
  329. (lambda (where config)
  330. (let ((old-sock (socket PF_UNIX SOCK_STREAM 0)))
  331. (bind old-sock AF_UNIX where)
  332. (close-port old-sock))
  333. (define connected? (make-condition))
  334. (define mq (connect/test config connected?))
  335. (yield-many)
  336. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  337. (yield-many)
  338. ;; Delete the old socket, otherwise the 'bind' below results in ‘address alreay in use’
  339. (delete-file where)
  340. (yield-many)
  341. (bind listening-sock AF_UNIX where)
  342. (yield-many)
  343. (listen listening-sock 1)
  344. (wait connected?)
  345. (test-connection mq listening-sock))))
  346. ;; Consider the case where GNUnet version N uses stream sockets,
  347. ;; GNUnet version M uses datagram sockets, the system initially
  348. ;; uses GNUnet version N, a client for version M is started
  349. ;; (initially failing to connect to the server), then the system
  350. ;; switches to GNUnet version M.
  351. (test-assert
  352. "connect-unix, will connect even if previous socket is different type"
  353. (call-with-socket-location
  354. (lambda (where config)
  355. (define old-sock (socket PF_UNIX SOCK_DGRAM 0))
  356. (bind old-sock AF_UNIX where)
  357. ;; Datagram sockets don't support 'listen', so don't
  358. ;; call 'listen' with 'old-sock'.
  359. (define connected? (make-condition))
  360. (define mq (connect/test config connected?))
  361. (yield-many)
  362. (close-port old-sock)
  363. (delete-file where)
  364. (define new-sock (socket PF_UNIX SOCK_STREAM 0))
  365. (bind new-sock AF_UNIX where)
  366. (listen new-sock 1)
  367. (wait connected?)
  368. (test-connection mq new-sock))))
  369. ;; Consider a system that creates directories and the socket
  370. ;; with world-unreadable, world-unexecutable permissions at
  371. ;; first and makes the permissions more permissive later.
  372. (test-assert
  373. "connect-unix, will connect even if permissions are temporarily wrong"
  374. (call-with-temporary-directory
  375. (lambda (tmpdir)
  376. ;; Permissions on sockets can be unreliable on some systems,
  377. ;; so modify the permissions of a directory instead.
  378. (define subdir (in-vicinity tmpdir "dir"))
  379. (mkdir subdir)
  380. (define where (in-vicinity subdir "sock.et"))
  381. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  382. (bind listening-sock AF_UNIX where)
  383. (listen listening-sock 1)
  384. (chmod subdir #o000) ; unreadable
  385. (define connected? (make-condition))
  386. (define mq (connect/test (make-config where) connected?))
  387. (yield-many)
  388. ;; make it readable again
  389. ;; (and writable such that 'tmpdir' can be deleted).
  390. (chmod subdir #o700)
  391. (wait connected?)
  392. (test-connection mq listening-sock))))
  393. (test-assert "port->message-queue, can send/receive between pairs"
  394. (run-fibers
  395. (lambda ()
  396. ;; Create two message queues connected to each other
  397. ;; over a socket pair. Send '1' over the first message queue
  398. ;; and expect to receive it from the second queue, and send '0'
  399. ;; over the second message queue and expect to receive it from
  400. ;; the first.
  401. (define pair (socketpair AF_UNIX SOCK_STREAM 0))
  402. ;; As 'fibers' is used instead of POSIX threads, set O_NONBLOCK.
  403. (make-nonblocking! (car pair))
  404. (make-nonblocking! (cdr pair))
  405. (define received/0 #f)
  406. (define received/1 #f)
  407. (define done/0 (make-condition))
  408. (define done/1 (make-condition))
  409. (define handlers/0
  410. (message-handlers
  411. (simple-handler 0
  412. (lambda (slice)
  413. (assert (not received/0))
  414. (check-slice-equal slice #vu8(0 4 0 0))
  415. (set! received/0 #t)
  416. (signal-condition! done/0)))))
  417. (define handlers/1
  418. (message-handlers
  419. (simple-handler 1
  420. (lambda (slice)
  421. (assert (not received/1))
  422. (check-slice-equal slice #vu8(0 4 0 1))
  423. (set! received/1 #t)
  424. (signal-condition! done/1)))))
  425. (define error-handler no-error-handler)
  426. (define mq/0 (port->message-queue (car pair) handlers/0 error-handler))
  427. (define mq/1 (port->message-queue (cdr pair) handlers/1 error-handler))
  428. (send-message! mq/0 (bv-slice/read-write #vu8(0 4 0 1)))
  429. (send-message! mq/1 (bv-slice/read-write #vu8(0 4 0 0)))
  430. (wait done/0)
  431. (wait done/1)
  432. #t)))
  433. (define (two-sockets)
  434. (define sp (socketpair AF_UNIX SOCK_STREAM 0))
  435. (make-nonblocking! (car sp))
  436. (make-nonblocking! (cdr sp))
  437. (values (car sp) (cdr sp)))
  438. (test-assert "input eof detected --> handle-input/output! stops (port->message-queue)"
  439. (call-with-spawner/wait
  440. (lambda (spawn)
  441. (define-values (alpha beta) (two-sockets))
  442. (define end-of-file (make-condition))
  443. (define (error-handler . e)
  444. (assert (equal? e '(input:regular-end-of-file)))
  445. ;; only one end-of-file notification
  446. (assert (signal-condition! end-of-file)))
  447. (define mq/alpha
  448. (port->message-queue alpha no-handlers error-handler
  449. #:spawn spawn))
  450. ;; Give the fibers started by 'port->message-queue' a chance to block.
  451. (yield-many)
  452. ;; Let 'beta' stop writing, such that 'alpha' receives an end-of-file.
  453. ;; But keep the 'write' end of 'alpha' / 'read' end of 'beta' open to
  454. ;; complicate matters.
  455. (shutdown beta 1)
  456. (wait end-of-file)
  457. (define sent? (make-atomic-box #f))
  458. ;; Attempt to write a message, even though the connection is (half-)closed.
  459. ;; It should not actually be sent.
  460. (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 0))
  461. #:notify-sent!
  462. (lambda ()
  463. ;; strictly speaking, this does not mean the message was
  464. ;; sent, but it's close enough for this test's purposes.
  465. (atomic-box-set! sent? #t)))
  466. ;; Give 'handle-output!' a chance to (faultively) sent the message.
  467. (yield-many)
  468. (sleep 0.1) ; the yield-many above is apparently insufficient
  469. (assert (not (atomic-box-ref sent?)))
  470. ;; If it didn't try to sent the message, that presumably means the
  471. ;; 'handle-output!' fiber has completed.
  472. #t)
  473. ;; Should make 'yield-many' less fragile.
  474. #:parallelism 1))
  475. (define (%false-if-broken-pipe thunk)
  476. "Call @var{thunk} in an environment where EPIPE system errors are caught.
  477. If an EPIPE system error is raised, return #f."
  478. (guard (c ((and (eq? 'system-error (exception-kind c))
  479. (= EPIPE (car (list-ref (exception-args c) 3))))
  480. #f))
  481. (thunk)))
  482. (define-syntax-rule (false-if-broken-pipe exp exp* ...)
  483. ;; See %false-if-broken-pipe
  484. (%false-if-broken-pipe
  485. (lambda ()
  486. exp exp* ...)))
  487. (test-assert "closed for writing --> handle-input! stops (port->message-queue)"
  488. (call-with-spawner/wait
  489. (lambda (spawn)
  490. (define-values (alpha beta) (two-sockets))
  491. (define received? (make-atomic-box #f))
  492. (define end-of-file (make-condition))
  493. (define (receive! slice)
  494. (assert (not (atomic-box-ref received?)))
  495. (atomic-box-set! received? #t)
  496. (error "shouldn't be received"))
  497. (define (error-handler . e)
  498. (pk 'e e)
  499. (assert (equal? e '(input:regular-end-of-file)))
  500. ;; only one end-of-file notification
  501. (assert (signal-condition! end-of-file)))
  502. (define mq/alpha
  503. (port->message-queue alpha
  504. (message-handlers
  505. (simple-handler 0 receive!))
  506. error-handler
  507. #:spawn spawn))
  508. ;; Give the new fibers a chance to block.
  509. (yield-many)
  510. ;; Let 'beta' stop reading, such that 'alpha' is closed for writing.
  511. ;; But keep the 'read' end of 'alpha' open to complicate matters.
  512. (shutdown beta 0)
  513. ;; TODO: fibers doesn't have an option for waiting for EPOLLRDHUP
  514. ;; or EPOLLERR, so the code cannot immediately detect that 'alpha'
  515. ;; cannot be written to anymore. Instead, 'handle-output!' will
  516. ;; detect the unwritability when it tries to write something.
  517. (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 9)))
  518. ;; The end-of-file error should be injected, even though the socket
  519. ;; is half-duplex and only the write end is closed, because message
  520. ;; queues do not have a notion of half-duplex connections.
  521. (pk 'waiting)
  522. (wait end-of-file)
  523. ;; Attempt to read a message (after buffering a message), even though
  524. ;; the connection is half-closed. Ignore broken pipe errors here:
  525. ;; if a ‘broken pipe’ error happens here, that means ALPHA was closed,
  526. ;; which is correct (tested in "port is closed at output").
  527. (false-if-broken-pipe (put-bytevector beta #vu8(0 4 0 0)))
  528. ;; As the 'handle-input!' fiber should have exited already, 'receive!'
  529. ;; shouldn't be called.
  530. (yield-many)
  531. (sleep 0.1) ; might not be necessary anymore
  532. #t)
  533. ;; Should make 'yield-many' less fragile.
  534. #:parallelism 1))
  535. ;; This detects the absence of the parametrisation of 'current-write-waiter'.
  536. (test-assert "writer blocking and closed for reading --> all fibers stop"
  537. (call-with-spawner/wait
  538. (lambda (spawn)
  539. (define-values (alpha beta) (two-sockets))
  540. ;; Fill the writing pipe, such that the writing fiber will block.
  541. #;(fcntl alpha SO_SNDBUF 1) ; doesn't work on sockets on Linux ..
  542. ;; Simply writing a byte isn't sufficient, as the kernel can
  543. ;; impose a minimum buffer size.
  544. (define old-waiter (current-write-waiter))
  545. (let/ec ec
  546. (parameterize ((current-write-waiter
  547. (lambda (port)
  548. (if (eq? port alpha)
  549. (ec)
  550. ;; maybe a backtrace
  551. (old-waiter port)))))
  552. (define bv (make-bytevector 4096))
  553. (let loop ()
  554. (put-bytevector alpha bv)
  555. (loop))))
  556. (define closed-condition (make-condition))
  557. (define (error-handler e)
  558. (assert (eq? e 'input:regular-end-of-file))
  559. (unless (signal-condition! closed-condition)
  560. (error "already saw end of file")))
  561. (define mq (port->message-queue alpha no-handlers error-handler
  562. #:spawn spawn))
  563. (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
  564. (define (notify-sent)
  565. ;; if the mq-stream implementation implemented buffering by itself
  566. ;; this would actually be possible.
  567. (error "impossible, should be blocking by now!"))
  568. (send-message! mq
  569. (bv-slice/read-write #vu8(0 4 0 0))
  570. #:notify-sent! notify-sent)
  571. (pk 'send2)
  572. ;; Give the write fiber a chance to block.
  573. (yield-many)
  574. (sleep 0.1)
  575. (shutdown alpha 0)
  576. #t)
  577. ;; Should make 'yield-many' less fragile.
  578. #:parallelism 1))
  579. ;; ^ if this test blocks, that means not all fibers have stopped
  580. (test-assert "output buffers are flushed"
  581. (let^ ((<-- (alpha beta) (two-sockets))
  582. ;; In Guile, by default, new sockets are unbuffered.
  583. ;; Add a buffer.
  584. (<-- _ (setvbuf alpha 'block 64))
  585. (<-- _ (setvbuf beta 'block 64))
  586. (! mq (make-message-queue no-handlers no-error-handler
  587. (lambda (_) (values))))
  588. ;; Send a message. As the message is smaller than the buffer size,
  589. ;; it will be buffered unless 'handle-output!' takes special action.
  590. (! _ (send-message! mq (slice/read-only (bv-slice/read-write #vu8(0 4 0 0)))))
  591. (! waited?
  592. (let/ec ec
  593. (let ((old-waiter (current-write-waiter)))
  594. (parameterize ((current-write-waiter
  595. (lambda (p)
  596. (if (eq? p alpha)
  597. (ec #t)
  598. (old-waiter p)))))
  599. (handle-output! mq alpha
  600. (lambda ()
  601. (pk 'waiting...)
  602. ((pk 'escaping ec) #f)
  603. (pk 'escaped!)))))))
  604. ;; If HANDLE-OUTPUT! blocked, that meant the underlying system call
  605. ;; was called, so the kernel got (some of the) data and all is well
  606. ;; -- except that the kernel buffer size of 4 bytes seems rather tiny.
  607. (? waited?
  608. (format (current-error-port) "≤4 bytes seems rather small~%")
  609. #t)
  610. (! old-read-waiter (current-read-waiter)) )
  611. ;; If waited? is false, that means HANDLE-OUTPUT! succeeded and now
  612. ;; the bytes are in Guile's or the kernel's buffers. Test if they
  613. ;; are in the kernel's.
  614. (let/ec ec
  615. (equal? #vu8(0 4 0 0)
  616. (parameterize ((current-read-waiter
  617. (lambda (p)
  618. (if (eq? p beta)
  619. (ec #f)
  620. (old-read-waiter p)))))
  621. (get-bytevector-some beta))))))
  622. (define (error-handler/regular . e)
  623. (match e
  624. ('(input:regular-end-of-file) (values))
  625. (_ (error "what ~a" e))))
  626. (test-assert "port is closed at input eof"
  627. (call-with-spawner/wait
  628. (lambda (spawn)
  629. (define-values (alpha beta) (two-sockets))
  630. (define q (port->message-queue alpha no-handlers error-handler/regular
  631. #:spawn spawn))
  632. (shutdown alpha 0)
  633. (yield-many)
  634. (sleep 0.05) ;; XXX yield-many above is unsufficient
  635. (port-closed? alpha))
  636. #:parallelism 1)) ; to make the use of yield-many less fragile
  637. (test-assert "port is closed at output eof"
  638. (call-with-spawner/wait
  639. (lambda (spawn)
  640. (define-values (alpha beta) (two-sockets))
  641. (define mq (port->message-queue alpha no-handlers error-handler/regular
  642. #:spawn spawn))
  643. (shutdown alpha 1)
  644. ;; XXX It's not possible for the output eof to be waited for currently,
  645. ;; so attempt to send a message to wake up the writing fiber.
  646. (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
  647. (yield-many)
  648. (sleep 0.05) ;; XXX yield-many above is unsufficient
  649. (port-closed? alpha))
  650. #:parallelism 1)) ; to make the use of yield-many less fragile
  651. (test-assert "port is closed at input/output eof"
  652. (call-with-spawner/wait
  653. (lambda (spawn)
  654. (define-values (alpha beta) (two-sockets))
  655. (define q (port->message-queue alpha no-handlers error-handler/regular
  656. #:spawn spawn))
  657. (shutdown alpha 2)
  658. (yield-many)
  659. (sleep 0.05) ;; XXX yield-many above is unsufficient
  660. (port-closed? alpha))
  661. #:parallelism 1)) ; to make the use of yield-many less fragile
  662. (test-assert "fibers stop and port closed after close! (directly after creation)"
  663. (let^ ((<-- (alpha beta) (two-sockets)))
  664. (call-with-spawner/wait
  665. (lambda (spawn)
  666. (define q (port->message-queue alpha no-handlers error-handler/regular
  667. #:spawn spawn))
  668. (close-queue! q))
  669. #:parallelism 1)
  670. (port-closed? alpha)))
  671. (test-assert "fibers stop and port closed after close! (some times passes)"
  672. (let^ ((<-- (alpha beta) (two-sockets)))
  673. (call-with-spawner/wait
  674. (lambda (spawn)
  675. (define q (port->message-queue alpha no-handlers error-handler/regular
  676. #:spawn spawn))
  677. (yield-many)
  678. (sleep 0.01)
  679. (close-queue! q))
  680. #:parallelism 1)
  681. (port-closed? alpha)))
  682. (test-assert "can close while still connecting (--> interrupted)"
  683. (call-with-socket-location
  684. (lambda (where config)
  685. (call-with-spawner/wait
  686. (lambda (spawn)
  687. (define interrupted? #f)
  688. (define cond (make-condition))
  689. (define (error-handler . e)
  690. (match e
  691. ('(connection:interrupted)
  692. (begin
  693. (pk 'interrupted)
  694. (assert (not interrupted?))
  695. (set! interrupted? #t)
  696. (signal-condition! cond)))
  697. (_ (error "what ~a" e))))
  698. (define mq (connect/fibers config "service" no-handlers error-handler
  699. #:spawn spawn))
  700. (close-queue! mq)
  701. (wait cond)
  702. #t)))))
  703. (test-assert "can close after being connected (--> regular-end-of-file)"
  704. (call-with-socket-location
  705. (lambda (where config)
  706. (call-with-spawner/wait
  707. (lambda (spawn)
  708. (define connected? #f)
  709. (define connected-condition (make-condition))
  710. (define disconnected? #f)
  711. (define disconnected-condition (make-condition))
  712. (define (error-handler . e)
  713. (match e
  714. ('(connection:connected)
  715. (pk 'connected)
  716. (assert (not connected?))
  717. (set! connected? #t)
  718. (signal-condition! connected-condition))
  719. ('(input:regular-end-of-file)
  720. (assert connected?)
  721. (assert (not disconnected?))
  722. (set! disconnected? #t)
  723. (signal-condition! disconnected-condition))
  724. (_ (error "what ~a" e))))
  725. (define mq (connect/fibers config "service" no-handlers error-handler
  726. #:spawn spawn))
  727. (spawn
  728. (lambda ()
  729. (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
  730. (bind listening-sock AF_UNIX where)
  731. (listen listening-sock 1)
  732. ;; Make it non-blocking (because guile-fibers is used)
  733. (make-nonblocking! listening-sock)
  734. ;; Not actually interested in the return value
  735. (accept listening-sock)))
  736. (wait connected-condition)
  737. (assert (not disconnected?))
  738. (close-queue! mq)
  739. (wait disconnected-condition)
  740. #t)))))
  741. (test-end "mq-stream")