client.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. Copyright 2017 Google Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package client provides a generic access layer for streaming telemetry
  14. // providers.
  15. //
  16. // The Client interface is implemented by 3 types in this package:
  17. //
  18. // - BaseClient simply forwards all messages from the underlying connection to
  19. // NotificationHandler or ProtoHandler (see type Query).
  20. //
  21. // - CacheClient wraps around BaseClient and adds a persistence layer for all
  22. // notifications. The notifications build up an internal tree which can be
  23. // queried and walked using CacheClient's methods.
  24. //
  25. // - ReconnectClient wraps around any Client implementation (BaseClient,
  26. // CacheClient or a user-provided one) and adds transparent reconnection loop
  27. // in Subscribe. Reconnection attempts are done with exponential backoff.
  28. //
  29. // This package uses pluggable transport implementations. For example, for gNMI
  30. // targets you need to add this blank import:
  31. // import _ "notabug.org/themusicgod1/gnmi/client/gnmi"
  32. //
  33. // That import will automatically register itself as available ClientType in
  34. // this package (using func init).
  35. //
  36. // If you want to write a custom implementation, implement Impl interface and
  37. // register it with unique name via func Register.
  38. //
  39. // Take a look at package examples in godoc for typical use cases.
  40. package client
  41. import (
  42. "errors"
  43. "fmt"
  44. "io"
  45. "sync"
  46. log "github.com/golang/glog"
  47. "context"
  48. )
  49. // Client defines a set of methods which every client must implement.
  50. // This package provides a few implementations: BaseClient, CacheClient,
  51. // ReconnectClient.
  52. //
  53. // Do not confuse this with Impl.
  54. type Client interface {
  55. // Subscribe will perform the provided query against the requested
  56. // clientType. clientType is the name of a specific Impl specified in
  57. // Register (most implementations will call Register in init()).
  58. //
  59. // It will try each clientType listed in order until one succeeds. If
  60. // clientType is nil, it will try each registered clientType in random
  61. // order.
  62. Subscribe(ctx context.Context, q Query, clientType ...string) error
  63. // Poll will send a poll request to the server and process all
  64. // notifications. It is up the caller to identify the sync and realize the
  65. // Poll is complete.
  66. Poll() error
  67. // Close terminates the underlying Impl, which usually terminates the
  68. // connection right away.
  69. // Close must be called to release any resources that Impl could have
  70. // allocated.
  71. Close() error
  72. // Impl will return the underlying client implementation. Most users
  73. // shouldn't use this.
  74. Impl() (Impl, error)
  75. // Set will make updates/deletes on the given values in SetRequest.
  76. //
  77. // Note that SetResponse and inner SetResult's contain Err fields that
  78. // should be checked manually. Error from Set is only related to
  79. // transport-layer issues in the RPC.
  80. Set(ctx context.Context, r SetRequest, clientType ...string) (SetResponse, error)
  81. }
  82. var (
  83. // ErrStopReading is the common error defined to have the client stop a read
  84. // loop.
  85. ErrStopReading = errors.New("stop the result reading loop")
  86. // ErrClientInit is the common error for when making calls before the client
  87. // has been started via Subscribe.
  88. ErrClientInit = errors.New("Subscribe() must be called before any operations on client")
  89. // ErrUnsupported is returned by Impl's methods when the underlying
  90. // implementation doesn't support it.
  91. ErrUnsupported = errors.New("operation not supported by client implementation")
  92. )
  93. // BaseClient is a streaming telemetry client with minimal footprint. The
  94. // caller must call Subscribe to perform the actual query. BaseClient stores no
  95. // state. All updates must be handled by the provided handlers inside of
  96. // Query.
  97. //
  98. // The zero value of BaseClient is ready for use (there is no constructor).
  99. type BaseClient struct {
  100. mu sync.RWMutex
  101. closed bool
  102. clientImpl Impl
  103. query Query
  104. }
  105. var _ Client = &BaseClient{}
  106. // Subscribe implements the Client interface.
  107. func (c *BaseClient) Subscribe(ctx context.Context, q Query, clientType ...string) error {
  108. if err := q.Validate(); err != nil {
  109. return err
  110. }
  111. if len(clientType) == 0 {
  112. clientType = RegisteredImpls()
  113. }
  114. // TODO: concurrent subscribes can be removed after we enforce reflection
  115. // at client Impl level.
  116. fn := func(ctx context.Context, typ string, input interface{}) (Impl, error) {
  117. q := input.(Query)
  118. impl, err := NewImpl(ctx, q.Destination(), typ)
  119. if err != nil {
  120. return nil, err
  121. }
  122. if err := impl.Subscribe(ctx, q); err != nil {
  123. impl.Close()
  124. return nil, err
  125. }
  126. return impl, nil
  127. }
  128. impl, err := getFirst(ctx, clientType, q, fn)
  129. if err != nil {
  130. return err
  131. }
  132. c.mu.Lock()
  133. c.query = q
  134. if c.clientImpl != nil {
  135. c.clientImpl.Close()
  136. }
  137. c.clientImpl = impl
  138. c.closed = false
  139. c.mu.Unlock()
  140. return c.run(impl)
  141. }
  142. // Poll implements the Client interface.
  143. func (c *BaseClient) Poll() error {
  144. impl, err := c.Impl()
  145. if err != nil {
  146. return ErrClientInit
  147. }
  148. if c.query.Type != Poll {
  149. return fmt.Errorf("Poll() can only be used on Poll query type: %v", c.query.Type)
  150. }
  151. if err := impl.Poll(); err != nil {
  152. return err
  153. }
  154. return c.run(impl)
  155. }
  156. // Close implements the Client interface.
  157. func (c *BaseClient) Close() error {
  158. c.mu.Lock()
  159. defer c.mu.Unlock()
  160. if c.clientImpl == nil {
  161. return ErrClientInit
  162. }
  163. c.closed = true
  164. return c.clientImpl.Close()
  165. }
  166. // Impl implements the Client interface.
  167. func (c *BaseClient) Impl() (Impl, error) {
  168. c.mu.Lock()
  169. defer c.mu.Unlock()
  170. if c.clientImpl == nil {
  171. return nil, ErrClientInit
  172. }
  173. return c.clientImpl, nil
  174. }
  175. // Set implements the Client interface.
  176. func (c *BaseClient) Set(ctx context.Context, r SetRequest, clientType ...string) (SetResponse, error) {
  177. impl, err := NewImpl(ctx, r.Destination, clientType...)
  178. if err != nil {
  179. return SetResponse{}, err
  180. }
  181. c.mu.Lock()
  182. c.clientImpl = impl
  183. c.mu.Unlock()
  184. return impl.Set(ctx, r)
  185. }
  186. func (c *BaseClient) run(impl Impl) error {
  187. for {
  188. err := impl.Recv()
  189. switch err {
  190. default:
  191. log.V(1).Infof("impl.Recv() received unknown error: %v", err)
  192. impl.Close()
  193. return err
  194. case io.EOF, ErrStopReading:
  195. log.V(1).Infof("impl.Recv() stop marker: %v", err)
  196. return nil
  197. case nil:
  198. }
  199. // Close fast, so that we don't deliver any buffered updates.
  200. //
  201. // Note: this approach still allows at most 1 update through after
  202. // Close. A more thorough solution would be to do the check at
  203. // Notification/ProtoHandler or Impl level, but that would involve much
  204. // more work.
  205. c.mu.RLock()
  206. closed := c.closed
  207. c.mu.RUnlock()
  208. if closed {
  209. return nil
  210. }
  211. }
  212. }