sema.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2009 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Semaphore implementation exposed to Go.
  5. // Intended use is provide a sleep and wakeup
  6. // primitive that can be used in the contended case
  7. // of other synchronization primitives.
  8. // Thus it targets the same goal as Linux's futex,
  9. // but it has much simpler semantics.
  10. //
  11. // That is, don't think of these as semaphores.
  12. // Think of them as a way to implement sleep and wakeup
  13. // such that every sleep is paired with a single wakeup,
  14. // even if, due to races, the wakeup happens before the sleep.
  15. //
  16. // See Mullender and Cox, ``Semaphores in Plan 9,''
  17. // http://swtch.com/semaphore.pdf
  18. package runtime
  19. import "unsafe"
  20. // Asynchronous semaphore for sync.Mutex.
  21. type semaRoot struct {
  22. lock mutex
  23. head *sudog
  24. tail *sudog
  25. nwait uint32 // Number of waiters. Read w/o the lock.
  26. }
  27. // Prime to not correlate with any user patterns.
  28. const semTabSize = 251
  29. var semtable [semTabSize]struct {
  30. root semaRoot
  31. pad [_CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
  32. }
  33. // Called from sync/net packages.
  34. func asyncsemacquire(addr *uint32) {
  35. semacquire(addr, true)
  36. }
  37. func asyncsemrelease(addr *uint32) {
  38. semrelease(addr)
  39. }
  40. // Called from runtime.
  41. func semacquire(addr *uint32, profile bool) {
  42. gp := getg()
  43. if gp != gp.m.curg {
  44. gothrow("semacquire not on the G stack")
  45. }
  46. // Easy case.
  47. if cansemacquire(addr) {
  48. return
  49. }
  50. // Harder case:
  51. // increment waiter count
  52. // try cansemacquire one more time, return if succeeded
  53. // enqueue itself as a waiter
  54. // sleep
  55. // (waiter descriptor is dequeued by signaler)
  56. s := acquireSudog()
  57. root := semroot(addr)
  58. t0 := int64(0)
  59. s.releasetime = 0
  60. if profile && blockprofilerate > 0 {
  61. t0 = cputicks()
  62. s.releasetime = -1
  63. }
  64. for {
  65. lock(&root.lock)
  66. // Add ourselves to nwait to disable "easy case" in semrelease.
  67. xadd(&root.nwait, 1)
  68. // Check cansemacquire to avoid missed wakeup.
  69. if cansemacquire(addr) {
  70. xadd(&root.nwait, -1)
  71. unlock(&root.lock)
  72. break
  73. }
  74. // Any semrelease after the cansemacquire knows we're waiting
  75. // (we set nwait above), so go to sleep.
  76. root.queue(addr, s)
  77. goparkunlock(&root.lock, "semacquire")
  78. if cansemacquire(addr) {
  79. break
  80. }
  81. }
  82. if s.releasetime > 0 {
  83. blockevent(int64(s.releasetime)-t0, 3)
  84. }
  85. releaseSudog(s)
  86. }
  87. func semrelease(addr *uint32) {
  88. root := semroot(addr)
  89. xadd(addr, 1)
  90. // Easy case: no waiters?
  91. // This check must happen after the xadd, to avoid a missed wakeup
  92. // (see loop in semacquire).
  93. if atomicload(&root.nwait) == 0 {
  94. return
  95. }
  96. // Harder case: search for a waiter and wake it.
  97. lock(&root.lock)
  98. if atomicload(&root.nwait) == 0 {
  99. // The count is already consumed by another goroutine,
  100. // so no need to wake up another goroutine.
  101. unlock(&root.lock)
  102. return
  103. }
  104. s := root.head
  105. for ; s != nil; s = s.next {
  106. if s.elem == unsafe.Pointer(addr) {
  107. xadd(&root.nwait, -1)
  108. root.dequeue(s)
  109. break
  110. }
  111. }
  112. unlock(&root.lock)
  113. if s != nil {
  114. if s.releasetime != 0 {
  115. s.releasetime = cputicks()
  116. }
  117. goready(s.g)
  118. }
  119. }
  120. func semroot(addr *uint32) *semaRoot {
  121. return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
  122. }
  123. func cansemacquire(addr *uint32) bool {
  124. for {
  125. v := atomicload(addr)
  126. if v == 0 {
  127. return false
  128. }
  129. if cas(addr, v, v-1) {
  130. return true
  131. }
  132. }
  133. }
  134. func (root *semaRoot) queue(addr *uint32, s *sudog) {
  135. s.g = getg()
  136. s.elem = unsafe.Pointer(addr)
  137. s.next = nil
  138. s.prev = root.tail
  139. if root.tail != nil {
  140. root.tail.next = s
  141. } else {
  142. root.head = s
  143. }
  144. root.tail = s
  145. }
  146. func (root *semaRoot) dequeue(s *sudog) {
  147. if s.next != nil {
  148. s.next.prev = s.prev
  149. } else {
  150. root.tail = s.prev
  151. }
  152. if s.prev != nil {
  153. s.prev.next = s.next
  154. } else {
  155. root.head = s.next
  156. }
  157. s.elem = nil
  158. s.next = nil
  159. s.prev = nil
  160. }
  161. // Synchronous semaphore for sync.Cond.
  162. type syncSema struct {
  163. lock mutex
  164. head *sudog
  165. tail *sudog
  166. }
  167. // Syncsemacquire waits for a pairing syncsemrelease on the same semaphore s.
  168. func syncsemacquire(s *syncSema) {
  169. lock(&s.lock)
  170. if s.head != nil && s.head.nrelease > 0 {
  171. // Have pending release, consume it.
  172. var wake *sudog
  173. s.head.nrelease--
  174. if s.head.nrelease == 0 {
  175. wake = s.head
  176. s.head = wake.next
  177. if s.head == nil {
  178. s.tail = nil
  179. }
  180. }
  181. unlock(&s.lock)
  182. if wake != nil {
  183. wake.next = nil
  184. goready(wake.g)
  185. }
  186. } else {
  187. // Enqueue itself.
  188. w := acquireSudog()
  189. w.g = getg()
  190. w.nrelease = -1
  191. w.next = nil
  192. w.releasetime = 0
  193. t0 := int64(0)
  194. if blockprofilerate > 0 {
  195. t0 = cputicks()
  196. w.releasetime = -1
  197. }
  198. if s.tail == nil {
  199. s.head = w
  200. } else {
  201. s.tail.next = w
  202. }
  203. s.tail = w
  204. goparkunlock(&s.lock, "semacquire")
  205. if t0 != 0 {
  206. blockevent(int64(w.releasetime)-t0, 2)
  207. }
  208. releaseSudog(w)
  209. }
  210. }
  211. // Syncsemrelease waits for n pairing syncsemacquire on the same semaphore s.
  212. func syncsemrelease(s *syncSema, n uint32) {
  213. lock(&s.lock)
  214. for n > 0 && s.head != nil && s.head.nrelease < 0 {
  215. // Have pending acquire, satisfy it.
  216. wake := s.head
  217. s.head = wake.next
  218. if s.head == nil {
  219. s.tail = nil
  220. }
  221. if wake.releasetime != 0 {
  222. wake.releasetime = cputicks()
  223. }
  224. wake.next = nil
  225. goready(wake.g)
  226. n--
  227. }
  228. if n > 0 {
  229. // enqueue itself
  230. w := acquireSudog()
  231. w.g = getg()
  232. w.nrelease = int32(n)
  233. w.next = nil
  234. w.releasetime = 0
  235. if s.tail == nil {
  236. s.head = w
  237. } else {
  238. s.tail.next = w
  239. }
  240. s.tail = w
  241. goparkunlock(&s.lock, "semarelease")
  242. releaseSudog(w)
  243. } else {
  244. unlock(&s.lock)
  245. }
  246. }
  247. func syncsemcheck(sz uintptr) {
  248. if sz != unsafe.Sizeof(syncSema{}) {
  249. print("runtime: bad syncSema size - sync=", sz, " runtime=", unsafe.Sizeof(syncSema{}), "\n")
  250. gothrow("bad syncSema size")
  251. }
  252. }