subscription.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "context"
  19. "sync"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common/mclock"
  22. )
  23. // Subscription represents a stream of events. The carrier of the events is typically a
  24. // channel, but isn't part of the interface.
  25. //
  26. // Subscriptions can fail while established. Failures are reported through an error
  27. // channel. It receives a value if there is an issue with the subscription (e.g. the
  28. // network connection delivering the events has been closed). Only one value will ever be
  29. // sent.
  30. //
  31. // The error channel is closed when the subscription ends successfully (i.e. when the
  32. // source of events is closed). It is also closed when Unsubscribe is called.
  33. //
  34. // The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
  35. // cases to ensure that resources related to the subscription are released. It can be
  36. // called any number of times.
  37. type Subscription interface {
  38. Err() <-chan error // returns the error channel
  39. Unsubscribe() // cancels sending of events, closing the error channel
  40. }
  41. // NewSubscription runs a producer function as a subscription in a new goroutine. The
  42. // channel given to the producer is closed when Unsubscribe is called. If fn returns an
  43. // error, it is sent on the subscription's error channel.
  44. func NewSubscription(producer func(<-chan struct{}) error) Subscription {
  45. s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
  46. go func() {
  47. defer close(s.err)
  48. err := producer(s.unsub)
  49. s.mu.Lock()
  50. defer s.mu.Unlock()
  51. if !s.unsubscribed {
  52. if err != nil {
  53. s.err <- err
  54. }
  55. s.unsubscribed = true
  56. }
  57. }()
  58. return s
  59. }
  60. type funcSub struct {
  61. unsub chan struct{}
  62. err chan error
  63. mu sync.Mutex
  64. unsubscribed bool
  65. }
  66. func (s *funcSub) Unsubscribe() {
  67. s.mu.Lock()
  68. if s.unsubscribed {
  69. s.mu.Unlock()
  70. return
  71. }
  72. s.unsubscribed = true
  73. close(s.unsub)
  74. s.mu.Unlock()
  75. // Wait for producer shutdown.
  76. <-s.err
  77. }
  78. func (s *funcSub) Err() <-chan error {
  79. return s.err
  80. }
  81. // Resubscribe calls fn repeatedly to keep a subscription established. When the
  82. // subscription is established, Resubscribe waits for it to fail and calls fn again. This
  83. // process repeats until Unsubscribe is called or the active subscription ends
  84. // successfully.
  85. //
  86. // Resubscribe applies backoff between calls to fn. The time between calls is adapted
  87. // based on the error rate, but will never exceed backoffMax.
  88. func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
  89. s := &resubscribeSub{
  90. waitTime: backoffMax / 10,
  91. backoffMax: backoffMax,
  92. fn: fn,
  93. err: make(chan error),
  94. unsub: make(chan struct{}),
  95. }
  96. go s.loop()
  97. return s
  98. }
  99. // A ResubscribeFunc attempts to establish a subscription.
  100. type ResubscribeFunc func(context.Context) (Subscription, error)
  101. type resubscribeSub struct {
  102. fn ResubscribeFunc
  103. err chan error
  104. unsub chan struct{}
  105. unsubOnce sync.Once
  106. lastTry mclock.AbsTime
  107. waitTime, backoffMax time.Duration
  108. }
  109. func (s *resubscribeSub) Unsubscribe() {
  110. s.unsubOnce.Do(func() {
  111. s.unsub <- struct{}{}
  112. <-s.err
  113. })
  114. }
  115. func (s *resubscribeSub) Err() <-chan error {
  116. return s.err
  117. }
  118. func (s *resubscribeSub) loop() {
  119. defer close(s.err)
  120. var done bool
  121. for !done {
  122. sub := s.subscribe()
  123. if sub == nil {
  124. break
  125. }
  126. done = s.waitForError(sub)
  127. sub.Unsubscribe()
  128. }
  129. }
  130. func (s *resubscribeSub) subscribe() Subscription {
  131. subscribed := make(chan error)
  132. var sub Subscription
  133. retry:
  134. for {
  135. s.lastTry = mclock.Now()
  136. ctx, cancel := context.WithCancel(context.Background())
  137. go func() {
  138. rsub, err := s.fn(ctx)
  139. sub = rsub
  140. subscribed <- err
  141. }()
  142. select {
  143. case err := <-subscribed:
  144. cancel()
  145. if err != nil {
  146. // Subscribing failed, wait before launching the next try.
  147. if s.backoffWait() {
  148. return nil
  149. }
  150. continue retry
  151. }
  152. if sub == nil {
  153. panic("event: ResubscribeFunc returned nil subscription and no error")
  154. }
  155. return sub
  156. case <-s.unsub:
  157. cancel()
  158. return nil
  159. }
  160. }
  161. }
  162. func (s *resubscribeSub) waitForError(sub Subscription) bool {
  163. defer sub.Unsubscribe()
  164. select {
  165. case err := <-sub.Err():
  166. return err == nil
  167. case <-s.unsub:
  168. return true
  169. }
  170. }
  171. func (s *resubscribeSub) backoffWait() bool {
  172. if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
  173. s.waitTime = s.backoffMax / 10
  174. } else {
  175. s.waitTime *= 2
  176. if s.waitTime > s.backoffMax {
  177. s.waitTime = s.backoffMax
  178. }
  179. }
  180. t := time.NewTimer(s.waitTime)
  181. defer t.Stop()
  182. select {
  183. case <-t.C:
  184. return false
  185. case <-s.unsub:
  186. return true
  187. }
  188. }
  189. // SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
  190. //
  191. // For code that handle more than one subscription, a scope can be used to conveniently
  192. // unsubscribe all of them with a single call. The example demonstrates a typical use in a
  193. // larger program.
  194. //
  195. // The zero value is ready to use.
  196. type SubscriptionScope struct {
  197. mu sync.Mutex
  198. subs map[*scopeSub]struct{}
  199. closed bool
  200. }
  201. type scopeSub struct {
  202. sc *SubscriptionScope
  203. s Subscription
  204. }
  205. // Track starts tracking a subscription. If the scope is closed, Track returns nil. The
  206. // returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
  207. // scope.
  208. func (sc *SubscriptionScope) Track(s Subscription) Subscription {
  209. sc.mu.Lock()
  210. defer sc.mu.Unlock()
  211. if sc.closed {
  212. return nil
  213. }
  214. if sc.subs == nil {
  215. sc.subs = make(map[*scopeSub]struct{})
  216. }
  217. ss := &scopeSub{sc, s}
  218. sc.subs[ss] = struct{}{}
  219. return ss
  220. }
  221. // Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
  222. // the tracked set. Calls to Track after Close return nil.
  223. func (sc *SubscriptionScope) Close() {
  224. sc.mu.Lock()
  225. defer sc.mu.Unlock()
  226. if sc.closed {
  227. return
  228. }
  229. sc.closed = true
  230. for s := range sc.subs {
  231. s.s.Unsubscribe()
  232. }
  233. sc.subs = nil
  234. }
  235. // Count returns the number of tracked subscriptions.
  236. // It is meant to be used for debugging.
  237. func (sc *SubscriptionScope) Count() int {
  238. sc.mu.Lock()
  239. defer sc.mu.Unlock()
  240. return len(sc.subs)
  241. }
  242. func (s *scopeSub) Unsubscribe() {
  243. s.s.Unsubscribe()
  244. s.sc.mu.Lock()
  245. defer s.sc.mu.Unlock()
  246. delete(s.sc.subs, s)
  247. }
  248. func (s *scopeSub) Err() <-chan error {
  249. return s.s.Err()
  250. }