register.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. /*
  2. Copyright 2017 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package client
  14. import (
  15. "errors"
  16. "fmt"
  17. "sort"
  18. "sync"
  19. "time"
  20. log "github.com/golang/glog"
  21. "context"
  22. "notabug.org/themusicgod1/gnmi/errlist"
  23. )
  24. var (
  25. mu sync.Mutex
  26. clientImpl = map[string]InitImpl{}
  27. )
  28. // Default timeout for all queries.
  29. const defaultTimeout = time.Minute
  30. // Impl is the protocol/RPC specific implementation of the streaming Client.
  31. // Unless you're implementing a new RPC format, this shouldn't be used directly.
  32. type Impl interface {
  33. // Subscribe sends a Subscribe request to the server.
  34. Subscribe(context.Context, Query) error
  35. // Recv processes a single message from the server. This method is exposed to
  36. // allow the generic client control the state of message processing.
  37. Recv() error
  38. // Close will close the underlying rpc connections.
  39. Close() error
  40. // Poll will send an implementation specific Poll request to the server.
  41. Poll() error
  42. // Set will make updates/deletes on the given values in SetRequest.
  43. Set(context.Context, SetRequest) (SetResponse, error)
  44. }
  45. // InitImpl is a constructor signature for all transport specific implementations.
  46. type InitImpl func(context.Context, Destination) (Impl, error)
  47. // Register will register the transport specific implementation.
  48. // The name must be unique across all transports.
  49. func Register(t string, f InitImpl) error {
  50. mu.Lock()
  51. defer mu.Unlock()
  52. if _, ok := clientImpl[t]; ok {
  53. return fmt.Errorf("Duplicate registration of type %q", t)
  54. }
  55. if f == nil {
  56. return errors.New("RegisterFunc cannot be nil")
  57. }
  58. clientImpl[t] = f
  59. log.V(1).Infof("client.Register(%q, func) successful.", t)
  60. return nil
  61. }
  62. // RegisterTest allows tests to override client implementation for any client
  63. // type. It's identical to Register, except t uniqueness is not enforced.
  64. //
  65. // RegisterTest is similar to ResetRegisteredImpls + Register.
  66. // Commonly used with the fake client (./fake directory).
  67. func RegisterTest(t string, f InitImpl) error {
  68. mu.Lock()
  69. defer mu.Unlock()
  70. if f == nil {
  71. return errors.New("RegisterFunc cannot be nil")
  72. }
  73. clientImpl[t] = f
  74. log.V(1).Infof("client.Register(%q, func) successful.", t)
  75. return nil
  76. }
  77. // NewImpl returns a client implementation based on the registered types.
  78. // It will try all clientTypes listed in parallel until one succeeds. If
  79. // clientType is nil, it will try all registered clientTypes.
  80. //
  81. // This function is only used internally and is exposed for testing only.
  82. func NewImpl(ctx context.Context, d Destination, clientType ...string) (Impl, error) {
  83. mu.Lock()
  84. registeredCount := len(clientImpl)
  85. if clientType == nil {
  86. for t := range clientImpl {
  87. clientType = append(clientType, t)
  88. }
  89. }
  90. mu.Unlock()
  91. if registeredCount == 0 {
  92. return nil, errors.New("no registered client types")
  93. }
  94. // If Timeout is not set, use a default one. There is pretty much never a
  95. // case where clients will want to wait for initial connection
  96. // indefinitely. Reconnect client helps with retries.
  97. if d.Timeout == 0 {
  98. d.Timeout = defaultTimeout
  99. }
  100. log.V(1).Infof("Attempting client types: %v", clientType)
  101. fn := func(ctx context.Context, typ string, input interface{}) (Impl, error) {
  102. mu.Lock()
  103. f, ok := clientImpl[typ]
  104. mu.Unlock()
  105. if !ok {
  106. return nil, fmt.Errorf("no registered client %q", typ)
  107. }
  108. d := input.(Destination)
  109. impl, err := f(ctx, d)
  110. if err != nil {
  111. return nil, err
  112. }
  113. log.V(1).Infof("client %q create with type %T", typ, impl)
  114. return impl, nil
  115. }
  116. return getFirst(ctx, clientType, d, fn)
  117. }
  118. type implFunc func(ctx context.Context, typ string, input interface{}) (Impl, error)
  119. // getFirst tries fn with all types in parallel and returns the Impl from first
  120. // one to succeed. input is passed directly to fn so it's safe to use an
  121. // unchecked type asserting inside fn.
  122. func getFirst(ctx context.Context, types []string, input interface{}, fn implFunc) (Impl, error) {
  123. if len(types) == 0 {
  124. return nil, errors.New("getFirst: no client types provided")
  125. }
  126. errC := make(chan error, len(types))
  127. implC := make(chan Impl)
  128. done := make(chan struct{})
  129. defer close(done)
  130. for _, t := range types {
  131. // Launch each clientType in parallel where each sends either an error or
  132. // an implementation over a channel.
  133. go func(t string) {
  134. impl, err := fn(ctx, t, input)
  135. if err != nil {
  136. errC <- fmt.Errorf("client %q : %v", t, err)
  137. return
  138. }
  139. select {
  140. case implC <- impl:
  141. case <-done:
  142. impl.Close()
  143. }
  144. }(t)
  145. }
  146. errs := errlist.Error{List: errlist.List{Separator: "\n\t"}}
  147. // Look for the first non-error client implementation or return an error if
  148. // all client types fail.
  149. for {
  150. select {
  151. case err := <-errC:
  152. errs.Add(err)
  153. if len(errs.Errors()) == len(types) {
  154. return nil, errs.Err()
  155. }
  156. case impl := <-implC:
  157. return impl, nil
  158. }
  159. }
  160. }
  161. // ResetRegisteredImpls removes and Impls registered with Register. This should
  162. // only be used in tests to clear out their mock Impls, so that they don't
  163. // affect other tests.
  164. func ResetRegisteredImpls() {
  165. mu.Lock()
  166. defer mu.Unlock()
  167. clientImpl = make(map[string]InitImpl)
  168. }
  169. // RegisteredImpls returns a slice of currently registered client types.
  170. func RegisteredImpls() []string {
  171. mu.Lock()
  172. defer mu.Unlock()
  173. var impls []string
  174. for k := range clientImpl {
  175. impls = append(impls, k)
  176. }
  177. sort.Strings(impls)
  178. return impls
  179. }