123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package event
- import (
- "context"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/common/mclock"
- )
- // Subscription represents a stream of events. The carrier of the events is typically a
- // channel, but isn't part of the interface.
- //
- // Subscriptions can fail while established. Failures are reported through an error
- // channel. It receives a value if there is an issue with the subscription (e.g. the
- // network connection delivering the events has been closed). Only one value will ever be
- // sent.
- //
- // The error channel is closed when the subscription ends successfully (i.e. when the
- // source of events is closed). It is also closed when Unsubscribe is called.
- //
- // The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
- // cases to ensure that resources related to the subscription are released. It can be
- // called any number of times.
- type Subscription interface {
- Err() <-chan error // returns the error channel
- Unsubscribe() // cancels sending of events, closing the error channel
- }
- // NewSubscription runs a producer function as a subscription in a new goroutine. The
- // channel given to the producer is closed when Unsubscribe is called. If fn returns an
- // error, it is sent on the subscription's error channel.
- func NewSubscription(producer func(<-chan struct{}) error) Subscription {
- s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
- go func() {
- defer close(s.err)
- err := producer(s.unsub)
- s.mu.Lock()
- defer s.mu.Unlock()
- if !s.unsubscribed {
- if err != nil {
- s.err <- err
- }
- s.unsubscribed = true
- }
- }()
- return s
- }
- type funcSub struct {
- unsub chan struct{}
- err chan error
- mu sync.Mutex
- unsubscribed bool
- }
- func (s *funcSub) Unsubscribe() {
- s.mu.Lock()
- if s.unsubscribed {
- s.mu.Unlock()
- return
- }
- s.unsubscribed = true
- close(s.unsub)
- s.mu.Unlock()
- // Wait for producer shutdown.
- <-s.err
- }
- func (s *funcSub) Err() <-chan error {
- return s.err
- }
- // Resubscribe calls fn repeatedly to keep a subscription established. When the
- // subscription is established, Resubscribe waits for it to fail and calls fn again. This
- // process repeats until Unsubscribe is called or the active subscription ends
- // successfully.
- //
- // Resubscribe applies backoff between calls to fn. The time between calls is adapted
- // based on the error rate, but will never exceed backoffMax.
- func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
- s := &resubscribeSub{
- waitTime: backoffMax / 10,
- backoffMax: backoffMax,
- fn: fn,
- err: make(chan error),
- unsub: make(chan struct{}),
- }
- go s.loop()
- return s
- }
- // A ResubscribeFunc attempts to establish a subscription.
- type ResubscribeFunc func(context.Context) (Subscription, error)
- type resubscribeSub struct {
- fn ResubscribeFunc
- err chan error
- unsub chan struct{}
- unsubOnce sync.Once
- lastTry mclock.AbsTime
- waitTime, backoffMax time.Duration
- }
- func (s *resubscribeSub) Unsubscribe() {
- s.unsubOnce.Do(func() {
- s.unsub <- struct{}{}
- <-s.err
- })
- }
- func (s *resubscribeSub) Err() <-chan error {
- return s.err
- }
- func (s *resubscribeSub) loop() {
- defer close(s.err)
- var done bool
- for !done {
- sub := s.subscribe()
- if sub == nil {
- break
- }
- done = s.waitForError(sub)
- sub.Unsubscribe()
- }
- }
- func (s *resubscribeSub) subscribe() Subscription {
- subscribed := make(chan error)
- var sub Subscription
- retry:
- for {
- s.lastTry = mclock.Now()
- ctx, cancel := context.WithCancel(context.Background())
- go func() {
- rsub, err := s.fn(ctx)
- sub = rsub
- subscribed <- err
- }()
- select {
- case err := <-subscribed:
- cancel()
- if err != nil {
- // Subscribing failed, wait before launching the next try.
- if s.backoffWait() {
- return nil
- }
- continue retry
- }
- if sub == nil {
- panic("event: ResubscribeFunc returned nil subscription and no error")
- }
- return sub
- case <-s.unsub:
- cancel()
- return nil
- }
- }
- }
- func (s *resubscribeSub) waitForError(sub Subscription) bool {
- defer sub.Unsubscribe()
- select {
- case err := <-sub.Err():
- return err == nil
- case <-s.unsub:
- return true
- }
- }
- func (s *resubscribeSub) backoffWait() bool {
- if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
- s.waitTime = s.backoffMax / 10
- } else {
- s.waitTime *= 2
- if s.waitTime > s.backoffMax {
- s.waitTime = s.backoffMax
- }
- }
- t := time.NewTimer(s.waitTime)
- defer t.Stop()
- select {
- case <-t.C:
- return false
- case <-s.unsub:
- return true
- }
- }
- // SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
- //
- // For code that handle more than one subscription, a scope can be used to conveniently
- // unsubscribe all of them with a single call. The example demonstrates a typical use in a
- // larger program.
- //
- // The zero value is ready to use.
- type SubscriptionScope struct {
- mu sync.Mutex
- subs map[*scopeSub]struct{}
- closed bool
- }
- type scopeSub struct {
- sc *SubscriptionScope
- s Subscription
- }
- // Track starts tracking a subscription. If the scope is closed, Track returns nil. The
- // returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
- // scope.
- func (sc *SubscriptionScope) Track(s Subscription) Subscription {
- sc.mu.Lock()
- defer sc.mu.Unlock()
- if sc.closed {
- return nil
- }
- if sc.subs == nil {
- sc.subs = make(map[*scopeSub]struct{})
- }
- ss := &scopeSub{sc, s}
- sc.subs[ss] = struct{}{}
- return ss
- }
- // Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
- // the tracked set. Calls to Track after Close return nil.
- func (sc *SubscriptionScope) Close() {
- sc.mu.Lock()
- defer sc.mu.Unlock()
- if sc.closed {
- return
- }
- sc.closed = true
- for s := range sc.subs {
- s.s.Unsubscribe()
- }
- sc.subs = nil
- }
- // Count returns the number of tracked subscriptions.
- // It is meant to be used for debugging.
- func (sc *SubscriptionScope) Count() int {
- sc.mu.Lock()
- defer sc.mu.Unlock()
- return len(sc.subs)
- }
- func (s *scopeSub) Unsubscribe() {
- s.s.Unsubscribe()
- s.sc.mu.Lock()
- defer s.sc.mu.Unlock()
- delete(s.sc.subs, s)
- }
- func (s *scopeSub) Err() <-chan error {
- return s.s.Err()
- }
|