fd_mutex.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package net
  5. import "sync/atomic"
  6. // fdMutex is a specialized synchronization primitive
  7. // that manages lifetime of an fd and serializes access
  8. // to Read and Write methods on netFD.
  9. type fdMutex struct {
  10. state uint64
  11. rsema uint32
  12. wsema uint32
  13. }
  14. // fdMutex.state is organized as follows:
  15. // 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
  16. // 1 bit - lock for read operations.
  17. // 1 bit - lock for write operations.
  18. // 20 bits - total number of references (read+write+misc).
  19. // 20 bits - number of outstanding read waiters.
  20. // 20 bits - number of outstanding write waiters.
  21. const (
  22. mutexClosed = 1 << 0
  23. mutexRLock = 1 << 1
  24. mutexWLock = 1 << 2
  25. mutexRef = 1 << 3
  26. mutexRefMask = (1<<20 - 1) << 3
  27. mutexRWait = 1 << 23
  28. mutexRMask = (1<<20 - 1) << 23
  29. mutexWWait = 1 << 43
  30. mutexWMask = (1<<20 - 1) << 43
  31. )
  32. // Read operations must do RWLock(true)/RWUnlock(true).
  33. // Write operations must do RWLock(false)/RWUnlock(false).
  34. // Misc operations must do Incref/Decref. Misc operations include functions like
  35. // setsockopt and setDeadline. They need to use Incref/Decref to ensure that
  36. // they operate on the correct fd in presence of a concurrent Close call
  37. // (otherwise fd can be closed under their feet).
  38. // Close operation must do IncrefAndClose/Decref.
  39. // RWLock/Incref return whether fd is open.
  40. // RWUnlock/Decref return whether fd is closed and there are no remaining references.
  41. func (mu *fdMutex) Incref() bool {
  42. for {
  43. old := atomic.LoadUint64(&mu.state)
  44. if old&mutexClosed != 0 {
  45. return false
  46. }
  47. new := old + mutexRef
  48. if new&mutexRefMask == 0 {
  49. panic("net: inconsistent fdMutex")
  50. }
  51. if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  52. return true
  53. }
  54. }
  55. }
  56. func (mu *fdMutex) IncrefAndClose() bool {
  57. for {
  58. old := atomic.LoadUint64(&mu.state)
  59. if old&mutexClosed != 0 {
  60. return false
  61. }
  62. // Mark as closed and acquire a reference.
  63. new := (old | mutexClosed) + mutexRef
  64. if new&mutexRefMask == 0 {
  65. panic("net: inconsistent fdMutex")
  66. }
  67. // Remove all read and write waiters.
  68. new &^= mutexRMask | mutexWMask
  69. if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  70. // Wake all read and write waiters,
  71. // they will observe closed flag after wakeup.
  72. for old&mutexRMask != 0 {
  73. old -= mutexRWait
  74. runtime_Semrelease(&mu.rsema)
  75. }
  76. for old&mutexWMask != 0 {
  77. old -= mutexWWait
  78. runtime_Semrelease(&mu.wsema)
  79. }
  80. return true
  81. }
  82. }
  83. }
  84. func (mu *fdMutex) Decref() bool {
  85. for {
  86. old := atomic.LoadUint64(&mu.state)
  87. if old&mutexRefMask == 0 {
  88. panic("net: inconsistent fdMutex")
  89. }
  90. new := old - mutexRef
  91. if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  92. return new&(mutexClosed|mutexRefMask) == mutexClosed
  93. }
  94. }
  95. }
  96. func (mu *fdMutex) RWLock(read bool) bool {
  97. var mutexBit, mutexWait, mutexMask uint64
  98. var mutexSema *uint32
  99. if read {
  100. mutexBit = mutexRLock
  101. mutexWait = mutexRWait
  102. mutexMask = mutexRMask
  103. mutexSema = &mu.rsema
  104. } else {
  105. mutexBit = mutexWLock
  106. mutexWait = mutexWWait
  107. mutexMask = mutexWMask
  108. mutexSema = &mu.wsema
  109. }
  110. for {
  111. old := atomic.LoadUint64(&mu.state)
  112. if old&mutexClosed != 0 {
  113. return false
  114. }
  115. var new uint64
  116. if old&mutexBit == 0 {
  117. // Lock is free, acquire it.
  118. new = (old | mutexBit) + mutexRef
  119. if new&mutexRefMask == 0 {
  120. panic("net: inconsistent fdMutex")
  121. }
  122. } else {
  123. // Wait for lock.
  124. new = old + mutexWait
  125. if new&mutexMask == 0 {
  126. panic("net: inconsistent fdMutex")
  127. }
  128. }
  129. if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  130. if old&mutexBit == 0 {
  131. return true
  132. }
  133. runtime_Semacquire(mutexSema)
  134. // The signaller has subtracted mutexWait.
  135. }
  136. }
  137. }
  138. func (mu *fdMutex) RWUnlock(read bool) bool {
  139. var mutexBit, mutexWait, mutexMask uint64
  140. var mutexSema *uint32
  141. if read {
  142. mutexBit = mutexRLock
  143. mutexWait = mutexRWait
  144. mutexMask = mutexRMask
  145. mutexSema = &mu.rsema
  146. } else {
  147. mutexBit = mutexWLock
  148. mutexWait = mutexWWait
  149. mutexMask = mutexWMask
  150. mutexSema = &mu.wsema
  151. }
  152. for {
  153. old := atomic.LoadUint64(&mu.state)
  154. if old&mutexBit == 0 || old&mutexRefMask == 0 {
  155. panic("net: inconsistent fdMutex")
  156. }
  157. // Drop lock, drop reference and wake read waiter if present.
  158. new := (old &^ mutexBit) - mutexRef
  159. if old&mutexMask != 0 {
  160. new -= mutexWait
  161. }
  162. if atomic.CompareAndSwapUint64(&mu.state, old, new) {
  163. if old&mutexMask != 0 {
  164. runtime_Semrelease(mutexSema)
  165. }
  166. return new&(mutexClosed|mutexRefMask) == mutexClosed
  167. }
  168. }
  169. }
  170. // Implemented in runtime package.
  171. func runtime_Semacquire(sema *uint32)
  172. func runtime_Semrelease(sema *uint32)