mq.scm 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931
  1. ;; This file is part of GNUnet.
  2. ;; Copyright (C) 2012, 2018 GNUnet e.V.
  3. ;; Copyright (C) 2021 Maxime Devos
  4. ;;
  5. ;; GNUnet is free software: you can redistribute it and/or modify it
  6. ;; under the terms of the GNU Affero General Public License as published
  7. ;; by the Free Software Foundation, either version 3 of the License,
  8. ;; or (at your option) any later version.
  9. ;;
  10. ;; GNUnet is distributed in the hope that it will be useful, but
  11. ;; WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. ;; Affero General Public License for more details.
  14. ;;
  15. ;; You should have received a copy of the GNU Affero General Public License
  16. ;; along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. ;;
  18. ;; SPDX-License-Identifier: AGPL3.0-or-later
  19. ;; Author: Florian Dold
  20. ;; Author: Christian Grothoff
  21. ;; Author: Maxime Devos
  22. (define-module (tests mq))
  23. (use-modules (ice-9 control)
  24. (tests utils) ; for conservative-gc?
  25. (fibers conditions)
  26. (fibers)
  27. (srfi srfi-1)
  28. (srfi srfi-26)
  29. (srfi srfi-39)
  30. (srfi srfi-43)
  31. (srfi srfi-64)
  32. (srfi srfi-111)
  33. ((rnrs base) #:select (assert mod))
  34. ((rnrs exceptions) #:select (guard))
  35. ((rnrs conditions) #:select (condition-who))
  36. ((rnrs arithmetic bitwise)
  37. #:select (bitwise-ior))
  38. (gnu gnunet netstruct syntactic)
  39. ((gnu gnunet netstruct procedural)
  40. #:select (u32/big))
  41. (gnu gnunet mq prio-prefs)
  42. (gnu gnunet mq prio-prefs2)
  43. (gnu gnunet util struct)
  44. (gnu gnunet utils bv-slice)
  45. ((gnu extractor enum)
  46. #:select (symbol-value value->index))
  47. (gnu gnunet message protocols)
  48. (gnu gnunet mq)
  49. (gnu gnunet mq envelope)
  50. (gnu gnunet mq handler)
  51. (quickcheck property)
  52. (quickcheck)
  53. (quickcheck arbitrary))
  54. ;; The client code sends the numbers 0 to
  55. ;; NUM_TRANSMISSIONS-1 over the message queue.
  56. ;; The notify-sent callback verifies whether
  57. ;; messages were sent in-order. The fake
  58. ;; ‘sender’ procedure verifies whether it received
  59. ;; the messages in order.
  60. ;;
  61. ;; Note that in more realistic situations, some
  62. ;; queueing can happen! A very special case
  63. ;; is being tested here.
  64. (define NUM_TRANSMISSIONS 100)
  65. (eval-when (expand load eval)
  66. (define-type /:msg:our-test:dummy
  67. (structure/packed
  68. (synopsis "A test message, containing an index")
  69. (documentation
  70. "The first time, a message with index 0 is sent.
  71. Then each time the index is increased.")
  72. (field (header /:message-header))
  73. (field (index u32/big)))))
  74. (define (index->dummy i)
  75. (let ((s (make-slice/read-write
  76. (sizeof /:msg:our-test:dummy '()))))
  77. (set%! /:msg:our-test:dummy '(header type) s
  78. (value->index (symbol-value message-type msg:util:dummy)))
  79. (set%! /:msg:our-test:dummy '(header size) s
  80. (sizeof /:msg:our-test:dummy '()))
  81. (set%! /:msg:our-test:dummy '(index) s i)
  82. s))
  83. (define (dummy->index s)
  84. (read% /:msg:our-test:dummy '(index) s))
  85. (define (client mq notify-sent-box sent-box)
  86. (define (see i)
  87. (if (= i (unbox notify-sent-box))
  88. (set-box! notify-sent-box (+ 1 i))
  89. (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) (sent: ~a)"
  90. i
  91. (unbox notify-sent-box)
  92. (unbox sent-box))))
  93. (do ((i 0 (+ 1 i)))
  94. ((>= i NUM_TRANSMISSIONS))
  95. (send-message! mq (index->dummy i)
  96. #:notify-sent! (cut see i))))
  97. (define (send-proc notify-sent-box sent-box envelope)
  98. (attempt-irrevocable-sent!
  99. envelope
  100. ((go message priority)
  101. (let ((index (dummy->index message)))
  102. (unless (= (+ index 1) (unbox notify-sent-box))
  103. (error "messages are being sent out-of-order or with queueing (index: ~a) (notify-sent: ~a) (sent: ~a)"
  104. index
  105. (unbox notify-sent-box)
  106. (unbox sent-box)))
  107. (unless (= index (unbox sent-box))
  108. (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
  109. index
  110. (unbox notify-sent-box)
  111. (unbox sent-box)))
  112. (set-box! sent-box (+ 1 index))
  113. (values)))
  114. ((cancelled)
  115. (error "how did this cancelling happen?"))
  116. ((already-sent)
  117. (error "forgot to remove envelope from queue"))))
  118. (define no-handlers (message-handlers))
  119. (define (no-error-handler . what)
  120. (error "were did this error come from?"))
  121. (test-equal "in-order, no queuing"
  122. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
  123. (let* ((notify-sent-box (box 0))
  124. (sent-box (box 0))
  125. (mq (make-message-queue no-handlers
  126. no-error-handler
  127. (make-one-by-one-sender
  128. (cut send-proc notify-sent-box sent-box <>)))))
  129. (client mq notify-sent-box sent-box)
  130. (list (unbox notify-sent-box) (unbox sent-box))))
  131. ;; Simulate buffering, by only ‘truly’ sending after each three messages.
  132. ;; This does _not_ test the queueing code! See the next test for that.
  133. ;; Make sure messages aren't lost, and they are still be sent in-order!
  134. ;;
  135. ;; (Assuming the sender is well-implemented. A buggy sender could send
  136. ;; things out-of-order.)
  137. (define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
  138. (let ((first-free (vector-index not stashed))
  139. (expected-filled (unbox mod-box)))
  140. (unless (= (or first-free 0) expected-filled)
  141. (error "did we lose a message?"))
  142. (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
  143. (if (not first-free)
  144. (begin
  145. (vector-map!
  146. (lambda (i envelope)
  147. (send-proc notify-sent-box sent-box envelope)
  148. #f)
  149. stashed)
  150. (vector-set! stashed 0 envelope))
  151. ;; @var{stashed} is not yet full; send the
  152. ;; envelope later!
  153. (vector-set! stashed first-free envelope))
  154. (values)))
  155. (define (expected-sent n k)
  156. (- n (let ((mod (mod n k)))
  157. (if (= mod 0)
  158. k
  159. mod))))
  160. (define k 3)
  161. (test-equal "in-order, some buffering"
  162. (map (cut expected-sent <> 3)
  163. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  164. (let* ((notify-sent-box (box 0))
  165. (sent-box (box 0))
  166. (mod-box (box 0))
  167. (stashed (make-vector k #f))
  168. (mq (make-message-queue no-handlers
  169. no-error-handler
  170. (make-one-by-one-sender
  171. (cut send-proc2 notify-sent-box sent-box mod-box stashed <>)))))
  172. (client mq notify-sent-box sent-box)
  173. (list (unbox notify-sent-box) (unbox sent-box))))
  174. ;; Test the queueing code by only flushing
  175. ;; the queue every N messages. Also check,
  176. ;; using flushing-allowed?, that sending
  177. ;; only happens when we expect it to happen.
  178. (define flushing-allowed?
  179. (make-parameter #f))
  180. (define (send-proc/check notify-sent-box sent-box envelope)
  181. (assert (flushing-allowed?))
  182. (send-proc notify-sent-box sent-box envelope))
  183. (define (make-every-n proc k)
  184. "Make a sender using @var{proc} every @var{k}
  185. invocations, and at other times doing nothing."
  186. ;; Should theoretically be an atomic, but the test is singly-threaded,
  187. ;; so don't bother.
  188. (define n-mod-k 0)
  189. (lambda (mq)
  190. (assert (not (flushing-allowed?)))
  191. (set! n-mod-k (+ 1 n-mod-k))
  192. (when (>= n-mod-k k)
  193. (set! n-mod-k 0)
  194. (parameterize ((flushing-allowed? #t))
  195. (proc mq)))))
  196. (test-equal "in-order, some queueing"
  197. (map (cut expected-sent <> 3)
  198. (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
  199. (let* ((notify-sent-box (box 0))
  200. (sent-box (box 0))
  201. (mq (make-message-queue no-handlers
  202. no-error-handler
  203. (make-every-n
  204. (make-one-by-one-sender
  205. (cut send-proc/check notify-sent-box sent-box <>))
  206. 3))))
  207. (client mq notify-sent-box sent-box)
  208. (list (unbox notify-sent-box) (unbox sent-box))))
  209. ;; Test that concurrency interacts well with queueing.
  210. ;;
  211. ;; The situation we consider, is a number
  212. ;; of different threads concurrently sending messages.
  213. ;; The test verifies whether all messages were, in fact, sent.
  214. ;;
  215. ;; To make things complicated, some queueing is introduced.
  216. ;; The sender will only proceed each time the current thread
  217. ;; has tried to send @var{k/thread} messages, and the sender
  218. ;; will only try to send at most @code{(+ k/thread e)}, where
  219. ;; @var{e} is a random number from @var{e/min} to @var{e/max}.
  220. ;; The tests detect the following potential problems in the code
  221. ;; by crashing (but not always, so you may need to re-run a few
  222. ;; times, three times tends to be enough in practice for me):
  223. ;;
  224. ;; * Replacing 'old' with 'queue' in
  225. ;; unless (pfds:queue-empty? old)
  226. ;; * Replacing 'old' with 'queue' in
  227. ;; receive (envelope new) (pfds:dequeue old)
  228. ;; * Replacing the first 'old' with 'queue' in
  229. ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
  230. ;; * Replacing the second 'old' with 'queue' in
  231. ;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
  232. ;; * Replacing 'old' by 'queue' in
  233. ;; (pfds:enqueue old envelope)
  234. ;; (only detected infrequently, odds 1 to 7 or so)
  235. ;; * Replacing the first 'old' by 'queue' in
  236. ;; (eq? old (swap-queue! old new))
  237. ;; in 'send-message!'
  238. ;; * Replacing the second 'old' by 'queue' in
  239. ;; (eq? old (swap-queue! old new))
  240. ;; in 'send-message!'
  241. ;;
  242. ;; The following problems cause a hang when testing:
  243. ;; * Replacing 'queue' by 'old' in (spin queue)
  244. ;; in 'make-one-by-one-sender'
  245. ;; * Replacing 'queue' by 'old' in (spin queue)
  246. ;; in 'send-message!'.
  247. ;;
  248. ;; The following problems cause a hang in a preceding
  249. ;; test:
  250. ;;
  251. ;; * Replacing the first 'old' by 'new' in
  252. ;; (eq? old (swap-queue! old new))
  253. ;; in 'send-message!'
  254. ;; * Replacing 'queue' by 'old' in
  255. ;; (spin queue)
  256. ;; in 'send-message!'
  257. ;; * Replacing 'queue' by 'new' in
  258. ;; (spin queue)
  259. ;; in 'send-message!'
  260. ;;
  261. ;; Some potential problems currently remain undetected:
  262. ;; * Replacing the 'new' by 'queue' in
  263. ;; (pfds:queue-length new)
  264. ;;
  265. ;; However, it is only for printing a warning
  266. ;; when the queue is rather full. Being slightly
  267. ;; off in queue length shouldn't be a problem
  268. ;; there, as the 'maximum reasonable bound'
  269. ;; is just a wild guess and not some exact
  270. ;; cut-off.
  271. ;;
  272. ;; Cancellation will be tested separately.
  273. (define random/thread
  274. (fluid->parameter (make-unbound-fluid)))
  275. (define k/thread 12)
  276. (define e/min 2)
  277. (define e/max 7)
  278. (define N_MESSAGES 1000)
  279. (define N_THREAD 40)
  280. ;; List of (thread-index . message-index)
  281. ;; received by current thread.
  282. (define received/thread
  283. (fluid->parameter (make-unbound-fluid)))
  284. (define i/thread
  285. (fluid->parameter (make-unbound-fluid)))
  286. ;; The sending is happening concurrently,
  287. ;; so in-order delivery cannot be guaranteed.
  288. ;; Thus, requesting in-order delivery seems
  289. ;; silly.
  290. (define prio
  291. (bitwise-ior
  292. (prio->integer 'prio:background)
  293. (value->index (symbol-value priority-preference
  294. pref:out-of-order))))
  295. (eval-when (expand load eval)
  296. (define-type /:msg:our-test:concurrency
  297. (structure/packed
  298. (synopsis "A test message, containing an thread and message index")
  299. (documentation
  300. "The first time, a message with index 0 is sent.
  301. Then each time the index is increased.")
  302. (field (header /:message-header))
  303. (field (index u32/big))
  304. (field (thread u32/big)))))
  305. (define (make-thread-message thread-index i)
  306. (let ((s (make-slice/read-write
  307. (sizeof /:msg:our-test:concurrency '()))))
  308. (set%! /:msg:our-test:concurrency '(header type) s
  309. (value->index (symbol-value message-type msg:util:dummy)))
  310. (set%! /:msg:our-test:concurrency '(header size) s
  311. (sizeof /:msg:our-test:concurrency '()))
  312. (set%! /:msg:our-test:concurrency '(index) s i)
  313. (set%! /:msg:our-test:concurrency '(thread) s thread-index)
  314. s))
  315. (define (decode-thread-message s)
  316. (cons (read% /:msg:our-test:concurrency '(thread) s)
  317. (read% /:msg:our-test:concurrency '(index) s)))
  318. (define (make-every-n/thread proc k)
  319. "Make a sender using @var{proc} every @var{k}
  320. invocations, and at other times doing nothing.
  321. @code{i/thread} is used for state."
  322. (lambda (mq)
  323. (assert (not (flushing-allowed?)))
  324. (i/thread (+ 1 (i/thread)))
  325. (when (>= (i/thread) k)
  326. (i/thread 0)
  327. (parameterize ((flushing-allowed? #t))
  328. (proc mq)))))
  329. (define (thread mq thread-index)
  330. (parameterize ((received/thread '())
  331. (i/thread 0)
  332. (random/thread
  333. (seed->random-state thread-index)))
  334. (do ((i 0 (+ 1 i)))
  335. ((>= i N_MESSAGES))
  336. (send-message! mq (make-thread-message thread-index i)
  337. #:priority prio))
  338. (received/thread)))
  339. (define (make-restricted-sender how-many make-sender inner-proc)
  340. "Make a sender that, when called, tries to send @code{(how-many)}
  341. messages, using @var{make-sender} and @var{inner-proc}."
  342. (define escape-thunk
  343. (fluid->parameter (make-unbound-fluid)))
  344. (define count
  345. (fluid->parameter (make-unbound-fluid)))
  346. (define max-count
  347. (fluid->parameter (make-unbound-fluid)))
  348. (define (count!)
  349. (count (+ 1 (count)))
  350. (when (= (count) (max-count))
  351. (count 0)
  352. ((escape-thunk))))
  353. (lambda (mq)
  354. (let/ec ec
  355. (parameterize ((max-count (how-many))
  356. (count 0)
  357. (escape-thunk ec))
  358. ((make-sender
  359. (lambda (envelope)
  360. (inner-proc envelope)
  361. ;; Check 'count' AFTER some things
  362. ;; have been sent! Otherwise, the
  363. ;; message is lost.
  364. (count!)
  365. (values)))
  366. mq)))))
  367. ;; After all threads have exited, we'll ‘drain’ out
  368. ;; the left-overs.
  369. (define drain? (make-parameter #f))
  370. (define (make-sender/choice y? x y)
  371. "When @code{(y?)}, send with @code{y}. Else, send
  372. with @code{x}."
  373. (lambda (mq)
  374. (if (y?)
  375. (y mq)
  376. (x mq))))
  377. (define (inner-send envelope)
  378. (attempt-irrevocable-sent!
  379. envelope
  380. ((go message priority)
  381. (received/thread (cons (decode-thread-message message)
  382. (received/thread)))
  383. (values))
  384. ((cancelled) (error "what/cancelled"))
  385. ((already-sent) (error "what/already-sent"))))
  386. (define sender/thread
  387. (make-sender/choice
  388. drain?
  389. (make-every-n/thread
  390. (make-restricted-sender
  391. (lambda ()
  392. (+ k/thread e/min
  393. (random (- e/max e/min -1) (random/thread))))
  394. make-one-by-one-sender
  395. inner-send)
  396. k/thread)
  397. (make-one-by-one-sender inner-send)))
  398. (define (results->array per-thread-sent)
  399. ;; A bit array of messages the send function has
  400. ;; seen.
  401. (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
  402. (define (visit-message message)
  403. (define thread-index (car message))
  404. (define message-index (cdr message))
  405. (array-set! a #t message-index thread-index))
  406. (define (visit-per-thread _ messages)
  407. (for-each visit-message messages))
  408. (vector-for-each visit-per-thread per-thread-sent)
  409. a)
  410. (define (array-missing a)
  411. (define missing '())
  412. (array-index-map! a
  413. (lambda (i j)
  414. (define found (array-ref a i j))
  415. (unless found
  416. (set! missing `((,i . ,j) . ,missing)))
  417. found))
  418. missing)
  419. ;; But possibly out-of-order!
  420. (test-equal "nothing lost when sending concurrently"
  421. '()
  422. (let* ((mq (make-message-queue no-handlers
  423. no-error-handler
  424. sender/thread))
  425. (thread-indices (iota N_THREAD))
  426. ;; The ‘drained-out’ messages are put
  427. ;; at index N_THREAD.
  428. (results (make-vector (+ 1 N_THREAD)))
  429. (done? (vector-unfold (lambda (_) (make-condition)) N_THREAD))
  430. (ready? (make-condition)))
  431. (run-fibers
  432. (lambda ()
  433. (define (run! thread-index)
  434. (spawn-fiber
  435. (lambda ()
  436. (wait ready?)
  437. (vector-set! results thread-index
  438. (thread mq thread-index))
  439. (signal-condition! (vector-ref done? thread-index)))))
  440. (for-each run! thread-indices)
  441. ;; Try to start every thread at the same time!
  442. (signal-condition! ready?)
  443. ;; #:drain? #t with parallelism is broken,
  444. ;; see <https://github.com/wingo/fibers/issues/47>.
  445. ;; So explicitely wait on each fiber.
  446. (vector-for-each (lambda (_ c) (wait c)) done?))
  447. #:drain? #t
  448. ;; No need
  449. #:install-suspendable-ports? #f
  450. ;; More interrupts --> more switches
  451. ;; --> more test coverage. At least,
  452. ;; that's the idea. Not really tested.
  453. #:hz 700)
  454. ;; Drain the left-overs.
  455. (parameterize ((drain? #t)
  456. (received/thread '()))
  457. (try-send-again! mq)
  458. (vector-set! results N_THREAD (received/thread)))
  459. (array-missing (results->array results))))
  460. ;; Test message injection / handling (no exceptions).
  461. (define mhp (vector-unfold (lambda (_) (make-parameter #f)) 4))
  462. (define mhv (vector-unfold (lambda (_) (make-parameter #f)) 4))
  463. (define mh (apply message-handlers
  464. (map (lambda (i)
  465. (make-message-handler i (lambda (p) (p))
  466. (lambda _
  467. (apply ((vector-ref mhv i)) _))
  468. (lambda _
  469. (apply ((vector-ref mhp i)) _))))
  470. (iota (vector-length mhp)))))
  471. ;; FWIW, passing #f is not really allowed.
  472. (define mq (make-message-queue mh #f #f))
  473. (test-eq "when injecting, handled message is eq?"
  474. #t
  475. (let ((m (make-slice/read-write 40))) ; could as wel have been 20
  476. (set%! /:message-header '(size)
  477. (slice-slice m 0 (sizeof /:message-header '())) 40)
  478. (let/ec ec
  479. (parameterize (((vector-ref mhp 0)
  480. (lambda (x)
  481. (ec (eq? x m))))
  482. ((vector-ref mhv 0)
  483. (lambda (x)
  484. (assert (eq? x m))
  485. #t)))
  486. (inject-message! mq m)
  487. 'unreachable))))
  488. (test-eq "non-zero types ok"
  489. #t
  490. (let ((s (make-slice/read-write (sizeof /:message-header '()))))
  491. (set%! /:message-header '(type) s 3)
  492. (set%! /:message-header '(size) s (sizeof /:message-header '()))
  493. (let/ec ec
  494. (parameterize (((vector-ref mhp 3)
  495. (lambda (x)
  496. (ec (eq? x s))))
  497. ((vector-ref mhv 3)
  498. (lambda (x)
  499. (assert (eq? s x))
  500. #t)))
  501. (inject-message! mq s)
  502. 'unreachable))))
  503. (test-equal "verifier & handler only called once"
  504. '(1 . 1)
  505. (let ((hcount 0)
  506. (vcount 0)
  507. (s (make-slice/read-write (sizeof /:message-header '()))))
  508. (set%! /:message-header '(size) s (sizeof /:message-header '()))
  509. (parameterize (((vector-ref mhp 0)
  510. (lambda (x)
  511. (set! hcount (+ 1 hcount))
  512. (assert (eq? x s))
  513. (values)))
  514. ((vector-ref mhv 0)
  515. (lambda (x)
  516. (set! vcount (+ 1 vcount))
  517. (assert (eq? x s))
  518. #t)))
  519. (inject-message! mq s)
  520. (cons hcount vcount))))
  521. ;; Test message injection (exceptions)
  522. (test-equal "missing header error"
  523. (map (lambda (i)
  524. `(missing-header-error (size . ,i)
  525. (who . inject-message!)))
  526. (iota (sizeof /:message-header '())))
  527. (map (lambda (i)
  528. (guard (e ((missing-header-error? e)
  529. `(missing-header-error
  530. (size . ,(missing-header-error-received-size e))
  531. (who . ,(condition-who e)))))
  532. (inject-message! mq (make-slice/read-write i))
  533. 'unreachable))
  534. (iota (sizeof /:message-header '()))))
  535. (test-assert "[prop] wrong header size error"
  536. (quickcheck
  537. (property ((%real-length $natural)
  538. (supposed-length $natural))
  539. (let* ((real-length (+ (sizeof /:message-header '())
  540. %real-length))
  541. (supposed-length (if (= real-length supposed-length)
  542. (+ 1 supposed-length)
  543. supposed-length))
  544. (s (make-slice/read-write real-length))
  545. (sheader (slice-slice s 0 (sizeof /:message-header '()))))
  546. (set%! /:message-header '(size)
  547. (slice-slice s 0 (sizeof /:message-header '()))
  548. supposed-length)
  549. (guard (e ((size-mismatch-error? e)
  550. (equal? `(,(size-mismatch-error-expected-size e)
  551. ,(size-mismatch-error-received-size e)
  552. ,(condition-who e))
  553. `(,supposed-length
  554. ,real-length
  555. inject-message!))))
  556. (inject-message! mq s)
  557. #f)))))
  558. ;; TODO: what if the message is (otherwise) malformed?
  559. ;; Test the following part of the send-message! docstring:
  560. ;; ‘After normal execution, the message envelope is returned,
  561. ;; but in case of an exception (for example, an out-of-memory exception
  562. ;; during the handling of a @code{&overly-full-queue-warning}), it is
  563. ;; possible the envelope isn't returned even though it has been enqueued
  564. ;; and it might perhaps be sent.
  565. (test-assert "returned envelope and sent envelope are equal"
  566. (let* ((returned-values #f)
  567. (sent-values #f)
  568. (sender
  569. (make-one-by-one-sender
  570. (lambda envelope-arguments
  571. (assert (eq? sent-values #f))
  572. (set! sent-values envelope-arguments)
  573. (values))))
  574. (mq (make-message-queue #f #f sender))
  575. (msg (index->dummy #xdeadbeef)))
  576. (call-with-values
  577. (lambda () (send-message! mq msg))
  578. (lambda return-values
  579. (set! returned-values return-values)))
  580. (and (equal? sent-values returned-values)
  581. (= (length sent-values) 1)
  582. (every envelope? sent-values))))
  583. ;; Strictly speaking, this test is allowed to fail
  584. ;; (as it is only ‘might’, not ‘it must be possible’),
  585. ;; but it seems a good idea to check our understanding is correct.
  586. (test-assert "message might be enqueued & sent but not returned"
  587. (let* ((enqueued? #f)
  588. (flush? (make-parameter #f))
  589. (sender/flush
  590. (make-one-by-one-sender
  591. (lambda (envelope)
  592. (set! enqueued? envelope)
  593. (values))))
  594. (sender/hold
  595. (lambda _ (values)))
  596. (sender (make-sender/choice flush? sender/hold
  597. sender/flush))
  598. (mq (make-message-queue #f #f sender))
  599. (msg (index->dummy 0))
  600. (exceptional #f)
  601. (enveloped #f))
  602. (with-exception-handler
  603. (lambda (_)
  604. (assert exceptional)
  605. (assert (envelope? enqueued?))
  606. (assert (not enveloped)))
  607. (lambda ()
  608. (with-exception-handler
  609. (lambda (e)
  610. (if (overly-full-queue-warning? e)
  611. (begin
  612. (set! exceptional #t)
  613. (parameterize ((flush? #t))
  614. (try-send-again! mq)
  615. ;; At least in the current implementation,
  616. ;; this holds.
  617. ;;
  618. ;; In a different implementation, the
  619. ;; envelope could be enqueued after
  620. ;; checking the queue length.
  621. (assert enqueued?))
  622. (throw 'out-of-memory))
  623. (raise-exception e #:continuable? #t)))
  624. (lambda ()
  625. (call-with-values
  626. (lambda ()
  627. (parameterize ((%suspicious-length 0))
  628. (send-message! mq msg)))
  629. (lambda args (set! enveloped args))))
  630. #:unwind? #f))
  631. #:unwind? #t
  632. #:unwind-for-type 'out-of-memory)
  633. (and enqueued? exceptional
  634. (not enveloped))))
  635. ;; Message cancellation.
  636. ;;
  637. ;; Cancellation is already tested in tests/envelope.scm.
  638. ;; However, the interaction with message queues has not
  639. ;; yet been tested.
  640. ;; This test detected (not detected by previous tests):
  641. ;; * the cdr of the contents of messages+garbage/box
  642. ;; being initialised incorrectly in make-message-queue
  643. ;; * using car instead of cdr in increment-garbage&maybe-cleanup
  644. (test-assert "envelopes do not keep a strong reference to the message queue"
  645. (let* ((mq (make-message-queue #f #f (lambda _ (values))))
  646. (mq-guard (make-guardian))
  647. (envelope (send-message! mq (index->dummy 0))))
  648. (mq-guard mq)
  649. (attempt-cancel!
  650. envelope
  651. ((now-cancelled)
  652. (gc)
  653. (->bool (mq-guard)))
  654. ((already-cancelled) (error "what/cancelled"))
  655. ((already-sent) (error "what/sent")))))
  656. (define (count-guardian/cancelled guardian)
  657. "Count how many elements are present in @var{guardian}.
  658. While we're at it, verify each element is a cancelled envelope."
  659. (let loop ((n 0))
  660. (let ((e (guardian)))
  661. (cond ((not e) n)
  662. ((envelope-peek-cancelled? e) (loop (+ n 1)))
  663. (#t (error "a not-cancelled envelope was freed!"))))))
  664. (define (count-guardian/uncancelled guardian)
  665. "Count how many elements are present in @var{guardian}.
  666. While we're at it, verify each element is an uncancelled envelope."
  667. (let loop ((n 0))
  668. (let ((e (guardian)))
  669. (cond ((not e) n)
  670. ((not (envelope-peek-cancelled? e)) (loop (+ n 1)))
  671. (#t (error "a cancelled envelope was freed!"))))))
  672. ;; This is a variant of
  673. ;; "the one-by-one message sender removes cancelled envelopes",
  674. ;; using guardians, and purely testing the cancelling code, and
  675. ;; not the sending code.
  676. ;;
  677. ;; It detects the following mutations:
  678. ;; * removing (spin queue+garbage) after swap! in the 'envelope-peek-cancelled?'
  679. ;; branch of 'make-one-by-one-sender'
  680. (test-assert "cancelling envelopes eventually frees memory even if message sender is dead"
  681. (let* ((mq (make-message-queue #f #f (lambda _ (values))))
  682. (cancelled-guard (make-guardian))
  683. (uncancelled-guard (make-guardian)))
  684. ;; Add a bunch of messages.
  685. (let ((messages
  686. (map (lambda (i)
  687. (send-message! mq (index->dummy i)))
  688. (iota 50))))
  689. ;; Cancel most of them. This should trigger collection of
  690. ;; cancelled envelopes.
  691. (for-each
  692. (lambda (e)
  693. (cancelled-guard e)
  694. (attempt-cancel!
  695. e
  696. ((now-cancelled) (values))
  697. ((already-cancelled) (error "what/cancelled"))
  698. ((already-sent) (error "what/sent"))))
  699. (list-head messages 40)))
  700. ;; Move freed envelopes to the guardian.
  701. (gc)
  702. ;; How many were freed?
  703. (let ((freed/cancelled (count-guardian/cancelled cancelled-guard))
  704. (freed/uncancelled (count-guardian/uncancelled uncancelled-guard))
  705. (cancelled 40)
  706. (total 50))
  707. (pk 'total total 'cancelled cancelled 'freed/cancelled freed/cancelled
  708. 'freed/uncancelled freed/uncancelled
  709. 'queue-length (message-queue-length mq))
  710. ;; Only cancelled messages were supposed to be freed.
  711. (assert (= freed/uncancelled 0))
  712. (assert (<= freed/cancelled cancelled))
  713. ;; A large fraction of cancelled messages should be freed.
  714. (assert (>= (/ freed/cancelled cancelled) 7/8))
  715. ;; If the GC is exact, all messages removed from the message
  716. ;; queue (due to cancelling) should be removed.
  717. (unless (conservative-gc?)
  718. (assert (= freed/cancelled (- total (message-queue-length mq)))))
  719. #t)))
  720. (define sender/no-cancelled
  721. (make-one-by-one-sender
  722. (lambda (e)
  723. (pk 'ee e)
  724. (assert (not (envelope-peek-cancelled? e)))
  725. (values))))
  726. ;; Not strictly necessary (and also undocumented), but this should
  727. ;; improve the accuracy of the garbage counter. Maybe not trying
  728. ;; to send useless (cancelled) envelopes could help with performance
  729. ;; as well (untested)?
  730. ;;
  731. ;; Also, this caught a bug in (gnu gnunet mq) -- the procedure returned
  732. ;; by 'make-one-by-one-sender' went into an infinite loop if it encountered
  733. ;; a cancelled envelope.
  734. ;;
  735. ;; This tests detects negating the test
  736. ;; (eq? old (swap! old (cons old-queue incremented-garbage)))
  737. ;; in increment-garbage&maybe-cleanup.
  738. (test-assert "the one-by-one message sender removes cancelled envelopes"
  739. (let* ((flush? (make-parameter #f))
  740. (sender (make-sender/choice flush? (lambda _ (values))
  741. sender/no-cancelled))
  742. (mq (make-message-queue #f #f sender)))
  743. ;; Fill the queue with many uncancelled messages, such that
  744. ;; the logic for collecting cancelled envelopes doesn't kick in too early.
  745. (do ((i 0 (+ i 1)))
  746. ((>= i 30))
  747. (send-message! mq (index->dummy i)))
  748. (assert (= (message-queue-length mq) 30))
  749. ;; Now add some envelopes to the queue & cancel them.
  750. (do ((i 0 (+ i 1)))
  751. ((>= i 4))
  752. (attempt-cancel!
  753. (send-message! mq (index->dummy (+ 30 i)))
  754. ((now-cancelled) (values))
  755. ((already-cancelled) (error "what / cancelled"))
  756. ((already-sent) (error "what / sent"))))
  757. (assert (= (message-queue-length mq) 34))
  758. (parameterize ((flush? #t))
  759. (try-send-again! mq))
  760. (assert (= (message-queue-length mq) 0))
  761. (assert (= (%message-queue-garbagitude mq) 0))
  762. #t))
  763. ;; This is a variation of "nothing lost when sending concurrently",
  764. ;; but for cancelation.
  765. ;;
  766. ;; This test fails in case of the following mutations:
  767. ;; * replace 0 with 1 in (or some other number) in
  768. ;; (swap! old (cons filtered 0))
  769. ;; in increment-garbage&maybe-cleanup
  770. (test-assert "the (approximate) cancellation count is accurate, when not sending, even when cancelling concurrently (also, uncancelled messages are not lost)"
  771. (let* ((messages/cancellation 10000)
  772. (n/not-cancelled #f)
  773. (flush? (make-parameter #f))
  774. (sender/check (lambda (e)
  775. (unless (envelope-peek-cancelled? e)
  776. (set! n/not-cancelled (+ 1 n/not-cancelled)))
  777. (values)))
  778. (sender (make-sender/choice flush?
  779. (lambda _ (values))
  780. (make-one-by-one-sender sender/check)))
  781. (mq (make-message-queue #f #f sender))
  782. (ready? (make-condition))
  783. (done? (vector-unfold
  784. (lambda (_) (make-condition))
  785. (/ messages/cancellation 2)))
  786. (messages
  787. (with-exception-handler
  788. (lambda (e)
  789. (if (overly-full-queue-warning? e)
  790. (values)
  791. (raise-exception e #:continuable? #t)))
  792. (lambda ()
  793. (vector-unfold (compose (cut send-message! mq <>)
  794. index->dummy)
  795. messages/cancellation)))))
  796. (run-fibers
  797. (lambda ()
  798. ;; Cancel half of the messages, concurrently.
  799. ;; Only half of all the messages are cancelled,
  800. ;; to avoid resetting the garbage counter.
  801. (vector-for-each
  802. (lambda (i done? message)
  803. (when (< i (/ messages/cancellation 2))
  804. (spawn-fiber
  805. (lambda ()
  806. (wait ready?)
  807. (attempt-cancel!
  808. message
  809. ((now-cancelled)
  810. (signal-condition! done?)
  811. (values))
  812. ((already-cancelled)
  813. (signal-condition! done?)
  814. (error "what/cancelled"))
  815. ((already-sent)
  816. (signal-condition! done?)
  817. (error "what/sent")))))))
  818. done? messages)
  819. (signal-condition! ready?)
  820. (vector-for-each (lambda (_ c) (wait c)) done?))
  821. #:hz 4000)
  822. ;; Verify the estimate is accurate, at least in this
  823. ;; situation.
  824. (assert (= (pk 'garbagitude (%message-queue-garbagitude mq))
  825. (pk 'expected (/ messages/cancellation 2))))
  826. ;; Cancel more messages (until 7/8 are cancelled),
  827. ;; to trigger collection. While we're at, verify
  828. ;; the estimate is still correct.
  829. (do ((i (/ messages/cancellation 2) (+ i 1)))
  830. ((>= (/ i messages/cancellation) 7/8))
  831. (attempt-cancel!
  832. (vector-ref messages (pk 'iiii i))
  833. ((now-cancelled)
  834. ;; 3/4 is the (arbitrary) ratio at which
  835. ;; the garbage is thrown out
  836. (if (< (* 4 i) (* 3 messages/cancellation))
  837. (assert (= (%message-queue-garbagitude mq)
  838. (+ i 1)))
  839. (assert (= (%message-queue-garbagitude mq)
  840. (- i (* 3/4 messages/cancellation))))))
  841. ((already-cancelled) (error "what/cancelled2"))
  842. ((already-sent) (error "what/sent2"))))
  843. ;; Now send the envelopes, to verify uncancelled messages
  844. ;; are still in the queue.
  845. (parameterize ((flush? #t))
  846. (set! n/not-cancelled 0)
  847. (try-send-again! mq))
  848. (assert (= n/not-cancelled (* 1/8 messages/cancellation)))
  849. ;; As everything has been removed from the queue,
  850. ;; the estimate should now be zero.
  851. (assert (= (pk 'final-garbagitude (%message-queue-garbagitude mq))
  852. 0))
  853. #t))