waitgroup.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // Copyright 2011 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package sync
  5. import (
  6. "sync/atomic"
  7. "unsafe"
  8. )
  9. // A WaitGroup waits for a collection of goroutines to finish.
  10. // The main goroutine calls Add to set the number of
  11. // goroutines to wait for. Then each of the goroutines
  12. // runs and calls Done when finished. At the same time,
  13. // Wait can be used to block until all goroutines have finished.
  14. type WaitGroup struct {
  15. m Mutex
  16. counter int32
  17. waiters int32
  18. sema *uint32
  19. }
  20. // WaitGroup creates a new semaphore each time the old semaphore
  21. // is released. This is to avoid the following race:
  22. //
  23. // G1: Add(1)
  24. // G1: go G2()
  25. // G1: Wait() // Context switch after Unlock() and before Semacquire().
  26. // G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
  27. // G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
  28. // G3: Add(1) // Makes counter == 1, waiters == 0.
  29. // G3: go G4()
  30. // G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
  31. // Add adds delta, which may be negative, to the WaitGroup counter.
  32. // If the counter becomes zero, all goroutines blocked on Wait are released.
  33. // If the counter goes negative, Add panics.
  34. //
  35. // Note that calls with a positive delta that occur when the counter is zero
  36. // must happen before a Wait. Calls with a negative delta, or calls with a
  37. // positive delta that start when the counter is greater than zero, may happen
  38. // at any time.
  39. // Typically this means the calls to Add should execute before the statement
  40. // creating the goroutine or other event to be waited for.
  41. // See the WaitGroup example.
  42. func (wg *WaitGroup) Add(delta int) {
  43. if raceenabled {
  44. _ = wg.m.state // trigger nil deref early
  45. if delta < 0 {
  46. // Synchronize decrements with Wait.
  47. raceReleaseMerge(unsafe.Pointer(wg))
  48. }
  49. raceDisable()
  50. defer raceEnable()
  51. }
  52. v := atomic.AddInt32(&wg.counter, int32(delta))
  53. if raceenabled {
  54. if delta > 0 && v == int32(delta) {
  55. // The first increment must be synchronized with Wait.
  56. // Need to model this as a read, because there can be
  57. // several concurrent wg.counter transitions from 0.
  58. raceRead(unsafe.Pointer(&wg.sema))
  59. }
  60. }
  61. if v < 0 {
  62. panic("sync: negative WaitGroup counter")
  63. }
  64. if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
  65. return
  66. }
  67. wg.m.Lock()
  68. if atomic.LoadInt32(&wg.counter) == 0 {
  69. for i := int32(0); i < wg.waiters; i++ {
  70. runtime_Semrelease(wg.sema)
  71. }
  72. wg.waiters = 0
  73. wg.sema = nil
  74. }
  75. wg.m.Unlock()
  76. }
  77. // Done decrements the WaitGroup counter.
  78. func (wg *WaitGroup) Done() {
  79. wg.Add(-1)
  80. }
  81. // Wait blocks until the WaitGroup counter is zero.
  82. func (wg *WaitGroup) Wait() {
  83. if raceenabled {
  84. _ = wg.m.state // trigger nil deref early
  85. raceDisable()
  86. }
  87. if atomic.LoadInt32(&wg.counter) == 0 {
  88. if raceenabled {
  89. raceEnable()
  90. raceAcquire(unsafe.Pointer(wg))
  91. }
  92. return
  93. }
  94. wg.m.Lock()
  95. w := atomic.AddInt32(&wg.waiters, 1)
  96. // This code is racing with the unlocked path in Add above.
  97. // The code above modifies counter and then reads waiters.
  98. // We must modify waiters and then read counter (the opposite order)
  99. // to avoid missing an Add.
  100. if atomic.LoadInt32(&wg.counter) == 0 {
  101. atomic.AddInt32(&wg.waiters, -1)
  102. if raceenabled {
  103. raceEnable()
  104. raceAcquire(unsafe.Pointer(wg))
  105. raceDisable()
  106. }
  107. wg.m.Unlock()
  108. if raceenabled {
  109. raceEnable()
  110. }
  111. return
  112. }
  113. if raceenabled && w == 1 {
  114. // Wait must be synchronized with the first Add.
  115. // Need to model this is as a write to race with the read in Add.
  116. // As a consequence, can do the write only for the first waiter,
  117. // otherwise concurrent Waits will race with each other.
  118. raceWrite(unsafe.Pointer(&wg.sema))
  119. }
  120. if wg.sema == nil {
  121. wg.sema = new(uint32)
  122. }
  123. s := wg.sema
  124. wg.m.Unlock()
  125. runtime_Semacquire(s)
  126. if raceenabled {
  127. raceEnable()
  128. raceAcquire(unsafe.Pointer(wg))
  129. }
  130. }