reconnect.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. /*
  2. Copyright 2017 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "sync"
  18. "time"
  19. log "github.com/golang/glog"
  20. "context"
  21. )
  22. var (
  23. // ReconnectBaseDelay is the minimum delay between re-Subscribe attempts in
  24. // Reconnect. You can change this before creating ReconnectClient instances.
  25. ReconnectBaseDelay = time.Second
  26. // ReconnectMaxDelay is the maximum delay between re-Subscribe attempts in
  27. // Reconnect. You can change this before creating ReconnectClient instances.
  28. ReconnectMaxDelay = time.Minute
  29. )
  30. // ReconnectClient is a wrapper around any Client that never returns from
  31. // Subscribe (unless explicitly closed). Underlying calls to Subscribe are
  32. // repeated indefinitely, with an exponential backoff between attempts.
  33. //
  34. // ReconnectClient should only be used with streaming or polling queries. Once
  35. // queries will fail immediately in Subscribe.
  36. type ReconnectClient struct {
  37. Client
  38. disconnect func()
  39. reset func()
  40. mu sync.Mutex
  41. subscribeDone chan struct{}
  42. cancel func()
  43. closed bool
  44. }
  45. var _ Client = &ReconnectClient{}
  46. // Reconnect wraps c and returns a new ReconnectClient using it.
  47. //
  48. // disconnect callback is called each time the underlying Subscribe returns, it
  49. // may be nil.
  50. //
  51. // reset callback is called each time the underlying Subscribe is retried, it
  52. // may be nil.
  53. //
  54. // Closing the returned ReconnectClient will unblock Subscribe.
  55. func Reconnect(c Client, disconnect, reset func()) *ReconnectClient {
  56. return &ReconnectClient{Client: c, disconnect: disconnect, reset: reset}
  57. }
  58. // Subscribe implements Client interface.
  59. func (p *ReconnectClient) Subscribe(ctx context.Context, q Query, clientType ...string) error {
  60. switch q.Type {
  61. default:
  62. return fmt.Errorf("ReconnectClient used for %s query", q.Type)
  63. case Stream, Poll:
  64. }
  65. ctx, done := p.initDone(ctx)
  66. defer done()
  67. failCount := 0
  68. for {
  69. start := time.Now()
  70. err := p.Client.Subscribe(ctx, q, clientType...)
  71. if p.disconnect != nil {
  72. p.disconnect()
  73. }
  74. failCount++
  75. // Check if Subscribe returned because ctx was canceled.
  76. select {
  77. case <-ctx.Done():
  78. return ctx.Err()
  79. default:
  80. }
  81. if err == nil {
  82. failCount = 0
  83. }
  84. // Since Client won't tell us whether error was immediate or after
  85. // streaming for a while, try to "guess" if it's the latter.
  86. if time.Since(start) > ReconnectMaxDelay {
  87. failCount = 0
  88. }
  89. bo := backoff(ReconnectBaseDelay, ReconnectMaxDelay, failCount)
  90. log.Errorf("client.Subscribe (target %q) failed (%d times): %v; reconnecting in %s", q.Target, failCount, err, bo)
  91. time.Sleep(bo)
  92. // Signal caller right before we attempt to reconnect.
  93. if p.reset != nil {
  94. p.reset()
  95. }
  96. }
  97. }
  98. // initDone finishes Subscribe initialization before starting the inner
  99. // Subscribe loop.
  100. // If p is closed before initDone, a cancelled context is returned.
  101. func (p *ReconnectClient) initDone(ctx context.Context) (context.Context, func()) {
  102. p.mu.Lock()
  103. defer p.mu.Unlock()
  104. p.subscribeDone = make(chan struct{})
  105. // ctx is cancelled in p.Close().
  106. ctx, p.cancel = context.WithCancel(ctx)
  107. // If Close was called before initDone returned, it didn't have a cancel
  108. // func to trigger. Trigger it here instead.
  109. // Since initDone and Cancel are synchronizing on p.mu, either this or
  110. // Close will call p.cancel(), preventing a hanging client.
  111. if p.closed {
  112. p.cancel()
  113. }
  114. return ctx, func() {
  115. close(p.subscribeDone)
  116. }
  117. }
  118. // Close implements Client interface.
  119. func (p *ReconnectClient) Close() error {
  120. subscribeDone := func() chan struct{} {
  121. p.mu.Lock()
  122. defer p.mu.Unlock()
  123. if p.cancel != nil {
  124. p.cancel()
  125. }
  126. p.closed = true
  127. return p.subscribeDone
  128. }()
  129. err := p.Client.Close()
  130. // Wait for Subscribe to return.
  131. if subscribeDone != nil {
  132. <-subscribeDone
  133. }
  134. return err
  135. }
  136. // Impl implements Client interface.
  137. func (p *ReconnectClient) Impl() (Impl, error) {
  138. return p.Client.Impl()
  139. }
  140. // Poll implements Client interface.
  141. // Poll may fail if Subscribe is reconnecting when it's called.
  142. func (p *ReconnectClient) Poll() error {
  143. return p.Client.Poll()
  144. }
  145. const (
  146. backoffFactor = 1.3 // backoff increases by this factor on each retry
  147. backoffRange = 0.4 // backoff is randomized downwards by this factor
  148. )
  149. // backoff a duration to wait for before retrying a query. The duration grows
  150. // exponentially as retries increases.
  151. func backoff(baseDelay, maxDelay time.Duration, retries int) time.Duration {
  152. backoff, max := float64(baseDelay), float64(maxDelay)
  153. for backoff < max && retries > 0 {
  154. backoff = backoff * backoffFactor
  155. retries--
  156. }
  157. if backoff > max {
  158. backoff = max
  159. }
  160. // Randomize backoff delays so that if a cluster of requests start at
  161. // the same time, they won't operate in lockstep. We just subtract up
  162. // to 40% so that we obey maxDelay.
  163. backoff -= backoff * backoffRange * rand.Float64()
  164. if backoff < 0 {
  165. return 0
  166. }
  167. return time.Duration(backoff)
  168. }