tweave.nim 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. discard """
  2. outputsub: '''Success'''
  3. cmd: '''nim c --gc:arc --threads:on $file'''
  4. disabled: "bsd"
  5. """
  6. # bug #13936
  7. import std/atomics
  8. when defined(nimPreviewSlimSystem):
  9. import std/[assertions, typedthreads]
  10. const MemBlockSize = 256
  11. type
  12. ChannelSPSCSingle* = object
  13. full{.align: 128.}: Atomic[bool]
  14. itemSize*: uint8
  15. buffer*{.align: 8.}: UncheckedArray[byte]
  16. proc `=`(
  17. dest: var ChannelSPSCSingle,
  18. source: ChannelSPSCSingle
  19. ) {.error: "A channel cannot be copied".}
  20. proc initialize*(chan: var ChannelSPSCSingle, itemsize: SomeInteger) {.inline.} =
  21. ## If ChannelSPSCSingle is used intrusive another data structure
  22. ## be aware that it should be the last part due to ending by UncheckedArray
  23. ## Also due to 128 bytes padding, it automatically takes half
  24. ## of the default MemBlockSize
  25. assert itemsize.int in 0 .. int high(uint8)
  26. assert itemSize.int +
  27. sizeof(chan.itemsize) +
  28. sizeof(chan.full) < MemBlockSize
  29. chan.itemSize = uint8 itemsize
  30. chan.full.store(false, moRelaxed)
  31. func isEmpty*(chan: var ChannelSPSCSingle): bool {.inline.} =
  32. not chan.full.load(moAcquire)
  33. func tryRecv*[T](chan: var ChannelSPSCSingle, dst: var T): bool {.inline.} =
  34. ## Try receiving the item buffered in the channel
  35. ## Returns true if successful (channel was not empty)
  36. ##
  37. ## ⚠ Use only in the consumer thread that reads from the channel.
  38. assert (sizeof(T) == chan.itemsize.int) or
  39. # Support dummy object
  40. (sizeof(T) == 0 and chan.itemsize == 1)
  41. let full = chan.full.load(moAcquire)
  42. if not full:
  43. return false
  44. dst = cast[ptr T](chan.buffer.addr)[]
  45. chan.full.store(false, moRelease)
  46. return true
  47. func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
  48. ## Try sending an item into the channel
  49. ## Reurns true if successful (channel was empty)
  50. ##
  51. ## ⚠ Use only in the producer thread that writes from the channel.
  52. assert (sizeof(T) == chan.itemsize.int) or
  53. # Support dummy object
  54. (sizeof(T) == 0 and chan.itemsize == 1)
  55. let full = chan.full.load(moAcquire)
  56. if full:
  57. return false
  58. cast[ptr T](chan.buffer.addr)[] = src
  59. chan.full.store(true, moRelease)
  60. return true
  61. # Sanity checks
  62. # ------------------------------------------------------------------------------
  63. when isMainModule:
  64. when not compileOption("threads"):
  65. {.error: "This requires --threads:on compilation flag".}
  66. template sendLoop[T](chan: var ChannelSPSCSingle,
  67. data: sink T,
  68. body: untyped): untyped =
  69. while not chan.trySend(data):
  70. body
  71. template recvLoop[T](chan: var ChannelSPSCSingle,
  72. data: var T,
  73. body: untyped): untyped =
  74. while not chan.tryRecv(data):
  75. body
  76. type
  77. ThreadArgs = object
  78. ID: WorkerKind
  79. chan: ptr ChannelSPSCSingle
  80. WorkerKind = enum
  81. Sender
  82. Receiver
  83. template Worker(id: WorkerKind, body: untyped): untyped {.dirty.} =
  84. if args.ID == id:
  85. body
  86. proc thread_func(args: ThreadArgs) =
  87. # Worker RECEIVER:
  88. # ---------
  89. # <- chan
  90. # <- chan
  91. # <- chan
  92. #
  93. # Worker SENDER:
  94. # ---------
  95. # chan <- 42
  96. # chan <- 53
  97. # chan <- 64
  98. Worker(Receiver):
  99. var val: int
  100. for j in 0 ..< 10:
  101. args.chan[].recvLoop(val):
  102. # Busy loop, in prod we might want to yield the core/thread timeslice
  103. discard
  104. echo " Receiver got: ", val
  105. doAssert val == 42 + j*11
  106. Worker(Sender):
  107. doAssert args.chan.full.load(moRelaxed) == false
  108. for j in 0 ..< 10:
  109. let val = 42 + j*11
  110. args.chan[].sendLoop(val):
  111. # Busy loop, in prod we might want to yield the core/thread timeslice
  112. discard
  113. echo "Sender sent: ", val
  114. proc main() =
  115. echo "Testing if 2 threads can send data"
  116. echo "-----------------------------------"
  117. var threads: array[2, Thread[ThreadArgs]]
  118. var chan = cast[ptr ChannelSPSCSingle](allocShared(MemBlockSize))
  119. chan[].initialize(itemSize = sizeof(int))
  120. createThread(threads[0], thread_func, ThreadArgs(ID: Receiver, chan: chan))
  121. createThread(threads[1], thread_func, ThreadArgs(ID: Sender, chan: chan))
  122. joinThread(threads[0])
  123. joinThread(threads[1])
  124. freeShared(chan)
  125. echo "-----------------------------------"
  126. echo "Success"
  127. main()