sync.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sync
  7. import (
  8. "fmt"
  9. "path/filepath"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/sasha-s/go-deadlock"
  17. )
  18. type clock interface {
  19. Now() time.Time
  20. }
  21. var defaultClock clock = (*standardClock)(nil)
  22. type Mutex interface {
  23. Lock()
  24. Unlock()
  25. }
  26. type RWMutex interface {
  27. Mutex
  28. RLock()
  29. RUnlock()
  30. }
  31. type WaitGroup interface {
  32. Add(int)
  33. Done()
  34. Wait()
  35. }
  36. func NewMutex() Mutex {
  37. if useDeadlock {
  38. return &deadlock.Mutex{}
  39. }
  40. if debug {
  41. mutex := &loggedMutex{}
  42. mutex.holder.Store(holder{})
  43. return mutex
  44. }
  45. return &sync.Mutex{}
  46. }
  47. func NewRWMutex() RWMutex {
  48. if useDeadlock {
  49. return &deadlock.RWMutex{}
  50. }
  51. if debug {
  52. mutex := &loggedRWMutex{
  53. readHolders: make(map[int][]holder),
  54. unlockers: make(chan holder, 1024),
  55. }
  56. mutex.holder.Store(holder{})
  57. return mutex
  58. }
  59. return &sync.RWMutex{}
  60. }
  61. func NewWaitGroup() WaitGroup {
  62. if debug {
  63. return &loggedWaitGroup{}
  64. }
  65. return &sync.WaitGroup{}
  66. }
  67. type holder struct {
  68. at string
  69. time time.Time
  70. goid int
  71. }
  72. func (h holder) String() string {
  73. if h.at == "" {
  74. return "not held"
  75. }
  76. return fmt.Sprintf("at %s goid: %d for %s", h.at, h.goid, defaultClock.Now().Sub(h.time))
  77. }
  78. type loggedMutex struct {
  79. sync.Mutex
  80. holder atomic.Value
  81. }
  82. func (m *loggedMutex) Lock() {
  83. m.Mutex.Lock()
  84. m.holder.Store(getHolder())
  85. }
  86. func (m *loggedMutex) Unlock() {
  87. currentHolder := m.holder.Load().(holder)
  88. duration := defaultClock.Now().Sub(currentHolder.time)
  89. if duration >= threshold {
  90. l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  91. }
  92. m.holder.Store(holder{})
  93. m.Mutex.Unlock()
  94. }
  95. func (m *loggedMutex) Holders() string {
  96. return m.holder.Load().(holder).String()
  97. }
  98. type loggedRWMutex struct {
  99. sync.RWMutex
  100. holder atomic.Value
  101. readHolders map[int][]holder
  102. readHoldersMut sync.Mutex
  103. logUnlockers int32
  104. unlockers chan holder
  105. }
  106. func (m *loggedRWMutex) Lock() {
  107. start := defaultClock.Now()
  108. atomic.StoreInt32(&m.logUnlockers, 1)
  109. m.RWMutex.Lock()
  110. atomic.StoreInt32(&m.logUnlockers, 0)
  111. holder := getHolder()
  112. m.holder.Store(holder)
  113. duration := holder.time.Sub(start)
  114. if duration > threshold {
  115. var unlockerStrings []string
  116. loop:
  117. for {
  118. select {
  119. case holder := <-m.unlockers:
  120. unlockerStrings = append(unlockerStrings, holder.String())
  121. default:
  122. break loop
  123. }
  124. }
  125. l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking:\n%s", duration, holder.at, strings.Join(unlockerStrings, "\n"))
  126. }
  127. }
  128. func (m *loggedRWMutex) Unlock() {
  129. currentHolder := m.holder.Load().(holder)
  130. duration := defaultClock.Now().Sub(currentHolder.time)
  131. if duration >= threshold {
  132. l.Debugf("RWMutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  133. }
  134. m.holder.Store(holder{})
  135. m.RWMutex.Unlock()
  136. }
  137. func (m *loggedRWMutex) RLock() {
  138. m.RWMutex.RLock()
  139. holder := getHolder()
  140. m.readHoldersMut.Lock()
  141. m.readHolders[holder.goid] = append(m.readHolders[holder.goid], holder)
  142. m.readHoldersMut.Unlock()
  143. }
  144. func (m *loggedRWMutex) RUnlock() {
  145. id := goid()
  146. m.readHoldersMut.Lock()
  147. current := m.readHolders[id]
  148. if len(current) > 0 {
  149. m.readHolders[id] = current[:len(current)-1]
  150. }
  151. m.readHoldersMut.Unlock()
  152. if atomic.LoadInt32(&m.logUnlockers) == 1 {
  153. holder := getHolder()
  154. select {
  155. case m.unlockers <- holder:
  156. default:
  157. l.Debugf("Dropped holder %s as channel full", holder)
  158. }
  159. }
  160. m.RWMutex.RUnlock()
  161. }
  162. func (m *loggedRWMutex) Holders() string {
  163. output := m.holder.Load().(holder).String() + " (writer)"
  164. m.readHoldersMut.Lock()
  165. for _, holders := range m.readHolders {
  166. for _, holder := range holders {
  167. output += "\n" + holder.String() + " (reader)"
  168. }
  169. }
  170. m.readHoldersMut.Unlock()
  171. return output
  172. }
  173. type loggedWaitGroup struct {
  174. sync.WaitGroup
  175. }
  176. func (wg *loggedWaitGroup) Wait() {
  177. start := defaultClock.Now()
  178. wg.WaitGroup.Wait()
  179. duration := defaultClock.Now().Sub(start)
  180. if duration >= threshold {
  181. l.Debugf("WaitGroup took %v at %s", duration, getHolder())
  182. }
  183. }
  184. func getHolder() holder {
  185. _, file, line, _ := runtime.Caller(2)
  186. file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file))
  187. return holder{
  188. at: fmt.Sprintf("%s:%d", file, line),
  189. goid: goid(),
  190. time: defaultClock.Now(),
  191. }
  192. }
  193. func goid() int {
  194. var buf [64]byte
  195. n := runtime.Stack(buf[:], false)
  196. idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
  197. id, err := strconv.Atoi(idField)
  198. if err != nil {
  199. return -1
  200. }
  201. return id
  202. }
  203. // TimeoutCond is a variant on Cond. It has roughly the same semantics regarding 'L' - it must be held
  204. // both when broadcasting and when calling TimeoutCondWaiter.Wait()
  205. // Call Broadcast() to broadcast to all waiters on the TimeoutCond. Call SetupWait to create a
  206. // TimeoutCondWaiter configured with the given timeout, which can then be used to listen for
  207. // broadcasts.
  208. type TimeoutCond struct {
  209. L sync.Locker
  210. ch chan struct{}
  211. }
  212. // TimeoutCondWaiter is a type allowing a consumer to wait on a TimeoutCond with a timeout. Wait() may be called multiple times,
  213. // and will return true every time that the TimeoutCond is broadcast to. Once the configured timeout
  214. // expires, Wait() will return false.
  215. // Call Stop() to release resources once this TimeoutCondWaiter is no longer needed.
  216. type TimeoutCondWaiter struct {
  217. c *TimeoutCond
  218. timer *time.Timer
  219. }
  220. func NewTimeoutCond(l sync.Locker) *TimeoutCond {
  221. return &TimeoutCond{
  222. L: l,
  223. }
  224. }
  225. func (c *TimeoutCond) Broadcast() {
  226. // ch.L must be locked when calling this function
  227. if c.ch != nil {
  228. close(c.ch)
  229. c.ch = nil
  230. }
  231. }
  232. func (c *TimeoutCond) SetupWait(timeout time.Duration) *TimeoutCondWaiter {
  233. timer := time.NewTimer(timeout)
  234. return &TimeoutCondWaiter{
  235. c: c,
  236. timer: timer,
  237. }
  238. }
  239. func (w *TimeoutCondWaiter) Wait() bool {
  240. // ch.L must be locked when calling this function
  241. // Ensure that the channel exists, since we're going to be waiting on it
  242. if w.c.ch == nil {
  243. w.c.ch = make(chan struct{})
  244. }
  245. ch := w.c.ch
  246. w.c.L.Unlock()
  247. defer w.c.L.Lock()
  248. select {
  249. case <-w.timer.C:
  250. return false
  251. case <-ch:
  252. return true
  253. }
  254. }
  255. func (w *TimeoutCondWaiter) Stop() {
  256. w.timer.Stop()
  257. }
  258. type standardClock struct{}
  259. func (*standardClock) Now() time.Time {
  260. return time.Now()
  261. }