manager.go 8.5 KB

  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at
  6. //go:generate -command counterfeiter go run
  7. //go:generate counterfeiter -o mocks/manager.go --fake-name Manager . Manager
  8. package discover
  9. import (
  10. "context"
  11. "crypto/tls"
  12. "fmt"
  13. "sort"
  14. "time"
  15. ""
  16. ""
  17. ""
  18. ""
  19. ""
  20. ""
  21. ""
  22. ""
  23. )
  24. // The Manager aggregates results from multiple Finders. Each Finder has
  25. // an associated cache time and negative cache time. The cache time sets how
  26. // long we cache and return successful lookup results, the negative cache
  27. // time sets how long we refrain from asking about the same device ID after
  28. // receiving a negative answer. The value of zero disables caching (positive
  29. // or negative).
  30. type Manager interface {
  31. FinderService
  32. ChildErrors() map[string]error
  33. }
  34. type manager struct {
  35. *suture.Supervisor
  36. myID protocol.DeviceID
  37. cfg config.Wrapper
  38. cert tls.Certificate
  39. evLogger events.Logger
  40. addressLister AddressLister
  41. registry *registry.Registry
  42. finders map[string]cachedFinder
  43. mut sync.RWMutex
  44. }
  45. func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister, registry *registry.Registry) Manager {
  46. m := &manager{
  47. Supervisor: suture.New("discover.Manager", svcutil.SpecWithDebugLogger(l)),
  48. myID: myID,
  49. cfg: cfg,
  50. cert: cert,
  51. evLogger: evLogger,
  52. addressLister: lister,
  53. registry: registry,
  54. finders: make(map[string]cachedFinder),
  55. mut: sync.NewRWMutex(),
  56. }
  57. m.Add(svcutil.AsService(m.serve, m.String()))
  58. return m
  59. }
  60. func (m *manager) serve(ctx context.Context) error {
  61. m.cfg.Subscribe(m)
  62. m.CommitConfiguration(config.Configuration{}, m.cfg.RawCopy())
  63. <-ctx.Done()
  64. m.cfg.Unsubscribe(m)
  65. return nil
  66. }
  67. func (m *manager) addLocked(identity string, finder Finder, cacheTime, negCacheTime time.Duration) {
  68. entry := cachedFinder{
  69. Finder: finder,
  70. cacheTime: cacheTime,
  71. negCacheTime: negCacheTime,
  72. cache: newCache(),
  73. token: nil,
  74. }
  75. if service, ok := finder.(suture.Service); ok {
  76. token := m.Supervisor.Add(service)
  77. entry.token = &token
  78. }
  79. m.finders[identity] = entry
  80. l.Infoln("Using discovery mechanism:", identity)
  81. }
  82. func (m *manager) removeLocked(identity string) {
  83. entry, ok := m.finders[identity]
  84. if !ok {
  85. return
  86. }
  87. if entry.token != nil {
  88. err := m.Supervisor.Remove(*entry.token)
  89. if err != nil {
  90. l.Warnf("removing discovery %s: %s", identity, err)
  91. }
  92. }
  93. delete(m.finders, identity)
  94. l.Infoln("Stopped using discovery mechanism: ", identity)
  95. }
  96. // Lookup attempts to resolve the device ID using any of the added Finders,
  97. // while obeying the cache settings.
  98. func (m *manager) Lookup(ctx context.Context, deviceID protocol.DeviceID) (addresses []string, err error) {
  99. m.mut.RLock()
  100. for _, finder := range m.finders {
  101. if cacheEntry, ok := finder.cache.Get(deviceID); ok {
  102. // We have a cache entry. Lets see what it says.
  103. if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
  104. // It's a positive, valid entry. Use it.
  105. l.Debugln("cached discovery entry for", deviceID, "at", finder)
  106. l.Debugln(" cache:", cacheEntry)
  107. addresses = append(addresses, cacheEntry.Addresses...)
  108. continue
  109. }
  110. valid := time.Now().Before(cacheEntry.validUntil) || time.Since(cacheEntry.when) < finder.negCacheTime
  111. if !cacheEntry.found && valid {
  112. // It's a negative, valid entry. We should not make another
  113. // attempt right now.
  114. l.Debugln("negative cache entry for", deviceID, "at", finder, "valid until", cacheEntry.when.Add(finder.negCacheTime), "or", cacheEntry.validUntil)
  115. continue
  116. }
  117. // It's expired. Ignore and continue.
  118. }
  119. // Perform the actual lookup and cache the result.
  120. if addrs, err := finder.Lookup(ctx, deviceID); err == nil {
  121. l.Debugln("lookup for", deviceID, "at", finder)
  122. l.Debugln(" addresses:", addrs)
  123. addresses = append(addresses, addrs...)
  124. finder.cache.Set(deviceID, CacheEntry{
  125. Addresses: addrs,
  126. when: time.Now(),
  127. found: len(addrs) > 0,
  128. })
  129. } else {
  130. // Lookup returned error, add a negative cache entry.
  131. entry := CacheEntry{
  132. when: time.Now(),
  133. found: false,
  134. }
  135. if err, ok := err.(cachedError); ok {
  136. entry.validUntil = time.Now().Add(err.CacheFor())
  137. }
  138. finder.cache.Set(deviceID, entry)
  139. }
  140. }
  141. m.mut.RUnlock()
  142. addresses = stringutil.UniqueTrimmedStrings(addresses)
  143. sort.Strings(addresses)
  144. l.Debugln("lookup results for", deviceID)
  145. l.Debugln(" addresses: ", addresses)
  146. return addresses, nil
  147. }
  148. func (*manager) String() string {
  149. return "discovery cache"
  150. }
  151. func (*manager) Error() error {
  152. return nil
  153. }
  154. func (m *manager) ChildErrors() map[string]error {
  155. children := make(map[string]error, len(m.finders))
  156. m.mut.RLock()
  157. for _, f := range m.finders {
  158. children[f.String()] = f.Error()
  159. }
  160. m.mut.RUnlock()
  161. return children
  162. }
  163. func (m *manager) Cache() map[protocol.DeviceID]CacheEntry {
  164. // Res will be the "total" cache, i.e. the union of our cache and all our
  165. // children's caches.
  166. res := make(map[protocol.DeviceID]CacheEntry)
  167. m.mut.RLock()
  168. for _, finder := range m.finders {
  169. // Each finder[i] has a corresponding cache. Go through
  170. // it and populate the total, appending any addresses and keeping
  171. // the newest "when" time. We skip any negative cache finders.
  172. for k, v := range finder.cache.Cache() {
  173. if v.found {
  174. cur := res[k]
  175. if v.when.After(cur.when) {
  176. cur.when = v.when
  177. }
  178. cur.Addresses = append(cur.Addresses, v.Addresses...)
  179. res[k] = cur
  180. }
  181. }
  182. // Then ask the finder itself for its cache and do the same. If this
  183. // finder is a global discovery client, it will have no cache. If it's
  184. // a local discovery client, this will be its current state.
  185. for k, v := range finder.Cache() {
  186. if v.found {
  187. cur := res[k]
  188. if v.when.After(cur.when) {
  189. cur.when = v.when
  190. }
  191. cur.Addresses = append(cur.Addresses, v.Addresses...)
  192. res[k] = cur
  193. }
  194. }
  195. }
  196. m.mut.RUnlock()
  197. for k, v := range res {
  198. v.Addresses = stringutil.UniqueTrimmedStrings(v.Addresses)
  199. res[k] = v
  200. }
  201. return res
  202. }
  203. func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool) {
  204. m.mut.Lock()
  205. defer m.mut.Unlock()
  206. toIdentities := make(map[string]struct{})
  207. if to.Options.GlobalAnnEnabled {
  208. for _, srv := range to.Options.GlobalDiscoveryServers() {
  209. toIdentities[globalDiscoveryIdentity(srv)] = struct{}{}
  210. }
  211. }
  212. if to.Options.LocalAnnEnabled {
  213. toIdentities[ipv4Identity(to.Options.LocalAnnPort)] = struct{}{}
  214. toIdentities[ipv6Identity(to.Options.LocalAnnMCAddr)] = struct{}{}
  215. }
  216. // Remove things that we're not expected to have.
  217. for identity := range m.finders {
  218. if _, ok := toIdentities[identity]; !ok {
  219. m.removeLocked(identity)
  220. }
  221. }
  222. // Add things we don't have.
  223. if to.Options.GlobalAnnEnabled {
  224. for _, srv := range to.Options.GlobalDiscoveryServers() {
  225. identity := globalDiscoveryIdentity(srv)
  226. // Skip, if it's already running.
  227. if _, ok := m.finders[identity]; ok {
  228. continue
  229. }
  230. gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger, m.registry)
  231. if err != nil {
  232. l.Warnln("Global discovery:", err)
  233. continue
  234. }
  235. // Each global discovery server gets its results cached for five
  236. // minutes, and is not asked again for a minute when it's returned
  237. // unsuccessfully.
  238. m.addLocked(identity, gd, 5*time.Minute, time.Minute)
  239. }
  240. }
  241. if to.Options.LocalAnnEnabled {
  242. // v4 broadcasts
  243. v4Identity := ipv4Identity(to.Options.LocalAnnPort)
  244. if _, ok := m.finders[v4Identity]; !ok {
  245. bcd, err := NewLocal(m.myID, fmt.Sprintf(":%d", to.Options.LocalAnnPort), m.addressLister, m.evLogger)
  246. if err != nil {
  247. l.Warnln("IPv4 local discovery:", err)
  248. } else {
  249. m.addLocked(v4Identity, bcd, 0, 0)
  250. }
  251. }
  252. // v6 multicasts
  253. v6Identity := ipv6Identity(to.Options.LocalAnnMCAddr)
  254. if _, ok := m.finders[v6Identity]; !ok {
  255. mcd, err := NewLocal(m.myID, to.Options.LocalAnnMCAddr, m.addressLister, m.evLogger)
  256. if err != nil {
  257. l.Warnln("IPv6 local discovery:", err)
  258. } else {
  259. m.addLocked(v6Identity, mcd, 0, 0)
  260. }
  261. }
  262. }
  263. return true
  264. }