service.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package nat
  7. import (
  8. "context"
  9. "fmt"
  10. "hash/fnv"
  11. "math/rand"
  12. "net"
  13. stdsync "sync"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/sync"
  18. )
  19. // Service runs a loop for discovery of IGDs (Internet Gateway Devices) and
  20. // setup/renewal of a port mapping.
  21. type Service struct {
  22. id protocol.DeviceID
  23. cfg config.Wrapper
  24. processScheduled chan struct{}
  25. mappings []*Mapping
  26. enabled bool
  27. mut sync.RWMutex
  28. }
  29. func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
  30. s := &Service{
  31. id: id,
  32. cfg: cfg,
  33. processScheduled: make(chan struct{}, 1),
  34. mut: sync.NewRWMutex(),
  35. }
  36. cfgCopy := cfg.RawCopy()
  37. s.CommitConfiguration(cfgCopy, cfgCopy)
  38. return s
  39. }
  40. func (s *Service) CommitConfiguration(_, to config.Configuration) bool {
  41. s.mut.Lock()
  42. if !s.enabled && to.Options.NATEnabled {
  43. l.Debugln("Starting NAT service")
  44. s.enabled = true
  45. s.scheduleProcess()
  46. } else if s.enabled && !to.Options.NATEnabled {
  47. l.Debugln("Stopping NAT service")
  48. s.enabled = false
  49. }
  50. s.mut.Unlock()
  51. return true
  52. }
  53. func (s *Service) Serve(ctx context.Context) error {
  54. s.cfg.Subscribe(s)
  55. defer s.cfg.Unsubscribe(s)
  56. announce := stdsync.Once{}
  57. timer := time.NewTimer(0)
  58. for {
  59. select {
  60. case <-timer.C:
  61. case <-s.processScheduled:
  62. if !timer.Stop() {
  63. select {
  64. case <-timer.C:
  65. default:
  66. }
  67. }
  68. case <-ctx.Done():
  69. timer.Stop()
  70. s.mut.RLock()
  71. for _, mapping := range s.mappings {
  72. mapping.clearAddresses()
  73. }
  74. s.mut.RUnlock()
  75. return ctx.Err()
  76. }
  77. s.mut.RLock()
  78. enabled := s.enabled
  79. s.mut.RUnlock()
  80. if !enabled {
  81. continue
  82. }
  83. found, renewIn := s.process(ctx)
  84. timer.Reset(renewIn)
  85. if found != -1 {
  86. announce.Do(func() {
  87. suffix := "s"
  88. if found == 1 {
  89. suffix = ""
  90. }
  91. l.Infoln("Detected", found, "NAT service"+suffix)
  92. })
  93. }
  94. }
  95. }
  96. func (s *Service) process(ctx context.Context) (int, time.Duration) {
  97. // toRenew are mappings which are due for renewal
  98. // toUpdate are the remaining mappings, which will only be updated if one of
  99. // the old IGDs has gone away, or a new IGD has appeared, but only if we
  100. // actually need to perform a renewal.
  101. var toRenew, toUpdate []*Mapping
  102. renewIn := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
  103. if renewIn == 0 {
  104. // We always want to do renewal so lets just pick a nice sane number.
  105. renewIn = 30 * time.Minute
  106. }
  107. s.mut.RLock()
  108. for _, mapping := range s.mappings {
  109. mapping.mut.RLock()
  110. expires := mapping.expires
  111. mapping.mut.RUnlock()
  112. if expires.Before(time.Now()) {
  113. toRenew = append(toRenew, mapping)
  114. } else {
  115. toUpdate = append(toUpdate, mapping)
  116. mappingRenewIn := time.Until(expires)
  117. if mappingRenewIn < renewIn {
  118. renewIn = mappingRenewIn
  119. }
  120. }
  121. }
  122. s.mut.RUnlock()
  123. // Don't do anything, unless we really need to renew
  124. if len(toRenew) == 0 {
  125. return -1, renewIn
  126. }
  127. nats := discoverAll(ctx, time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second)
  128. for _, mapping := range toRenew {
  129. s.updateMapping(ctx, mapping, nats, true)
  130. }
  131. for _, mapping := range toUpdate {
  132. s.updateMapping(ctx, mapping, nats, false)
  133. }
  134. return len(nats), renewIn
  135. }
  136. func (s *Service) scheduleProcess() {
  137. select {
  138. case s.processScheduled <- struct{}{}: // 1-buffered
  139. default:
  140. }
  141. }
  142. func (s *Service) NewMapping(protocol Protocol, ip net.IP, port int) *Mapping {
  143. mapping := &Mapping{
  144. protocol: protocol,
  145. address: Address{
  146. IP: ip,
  147. Port: port,
  148. },
  149. extAddresses: make(map[string]Address),
  150. mut: sync.NewRWMutex(),
  151. }
  152. s.mut.Lock()
  153. s.mappings = append(s.mappings, mapping)
  154. s.mut.Unlock()
  155. s.scheduleProcess()
  156. return mapping
  157. }
  158. // RemoveMapping does not actually remove the mapping from the IGD, it just
  159. // internally removes it which stops renewing the mapping. Also, it clears any
  160. // existing mapped addresses from the mapping, which as a result should cause
  161. // discovery to reannounce the new addresses.
  162. func (s *Service) RemoveMapping(mapping *Mapping) {
  163. s.mut.Lock()
  164. defer s.mut.Unlock()
  165. for i, existing := range s.mappings {
  166. if existing == mapping {
  167. mapping.clearAddresses()
  168. last := len(s.mappings) - 1
  169. s.mappings[i] = s.mappings[last]
  170. s.mappings[last] = nil
  171. s.mappings = s.mappings[:last]
  172. return
  173. }
  174. }
  175. }
  176. // updateMapping compares the addresses of the existing mapping versus the natds
  177. // discovered, and removes any addresses of natds that do not exist, or tries to
  178. // acquire mappings for natds which the mapping was unaware of before.
  179. // Optionally takes renew flag which indicates whether or not we should renew
  180. // mappings with existing natds
  181. func (s *Service) updateMapping(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) {
  182. renewalTime := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
  183. mapping.mut.Lock()
  184. mapping.expires = time.Now().Add(renewalTime)
  185. change := s.verifyExistingLocked(ctx, mapping, nats, renew)
  186. add := s.acquireNewLocked(ctx, mapping, nats)
  187. mapping.mut.Unlock()
  188. if change || add {
  189. mapping.notify()
  190. }
  191. }
  192. func (s *Service) verifyExistingLocked(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) (change bool) {
  193. leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
  194. for id, address := range mapping.extAddresses {
  195. select {
  196. case <-ctx.Done():
  197. return false
  198. default:
  199. }
  200. // Delete addresses for NATDevice's that do not exist anymore
  201. nat, ok := nats[id]
  202. if !ok {
  203. mapping.removeAddressLocked(id)
  204. change = true
  205. continue
  206. } else if renew {
  207. // Only perform renewals on the nat's that have the right local IP
  208. // address
  209. localIP := nat.GetLocalIPAddress()
  210. if !mapping.validGateway(localIP) {
  211. l.Debugf("Skipping %s for %s because of IP mismatch. %s != %s", id, mapping, mapping.address.IP, localIP)
  212. continue
  213. }
  214. l.Debugf("Renewing %s -> %s mapping on %s", mapping, address, id)
  215. addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, address.Port, leaseTime)
  216. if err != nil {
  217. l.Debugf("Failed to renew %s -> mapping on %s", mapping, address, id)
  218. mapping.removeAddressLocked(id)
  219. change = true
  220. continue
  221. }
  222. l.Debugf("Renewed %s -> %s mapping on %s", mapping, address, id)
  223. if !addr.Equal(address) {
  224. mapping.removeAddressLocked(id)
  225. mapping.setAddressLocked(id, addr)
  226. change = true
  227. }
  228. }
  229. }
  230. return change
  231. }
  232. func (s *Service) acquireNewLocked(ctx context.Context, mapping *Mapping, nats map[string]Device) (change bool) {
  233. leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
  234. addrMap := mapping.extAddresses
  235. for id, nat := range nats {
  236. select {
  237. case <-ctx.Done():
  238. return false
  239. default:
  240. }
  241. if _, ok := addrMap[id]; ok {
  242. continue
  243. }
  244. // Only perform mappings on the nat's that have the right local IP
  245. // address
  246. localIP := nat.GetLocalIPAddress()
  247. if !mapping.validGateway(localIP) {
  248. l.Debugf("Skipping %s for %s because of IP mismatch. %s != %s", id, mapping, mapping.address.IP, localIP)
  249. continue
  250. }
  251. l.Debugf("Acquiring %s mapping on %s", mapping, id)
  252. addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, 0, leaseTime)
  253. if err != nil {
  254. l.Debugf("Failed to acquire %s mapping on %s", mapping, id)
  255. continue
  256. }
  257. l.Debugf("Acquired %s -> %s mapping on %s", mapping, addr, id)
  258. mapping.setAddressLocked(id, addr)
  259. change = true
  260. }
  261. return change
  262. }
  263. // tryNATDevice tries to acquire a port mapping for the given internal address to
  264. // the given external port. If external port is 0, picks a pseudo-random port.
  265. func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPort int, leaseTime time.Duration) (Address, error) {
  266. var err error
  267. var port int
  268. // Generate a predictable random which is based on device ID + local port + hash of the device ID
  269. // number so that the ports we'd try to acquire for the mapping would always be the same for the
  270. // same device trying to get the same internal port.
  271. predictableRand := rand.New(rand.NewSource(int64(s.id.Short()) + int64(intPort) + hash(natd.ID())))
  272. if extPort != 0 {
  273. // First try renewing our existing mapping, if we have one.
  274. name := fmt.Sprintf("syncthing-%d", extPort)
  275. port, err = natd.AddPortMapping(ctx, TCP, intPort, extPort, name, leaseTime)
  276. if err == nil {
  277. extPort = port
  278. goto findIP
  279. }
  280. l.Debugln("Error extending lease on", natd.ID(), err)
  281. }
  282. for i := 0; i < 10; i++ {
  283. select {
  284. case <-ctx.Done():
  285. return Address{}, ctx.Err()
  286. default:
  287. }
  288. // Then try up to ten random ports.
  289. extPort = 1024 + predictableRand.Intn(65535-1024)
  290. name := fmt.Sprintf("syncthing-%d", extPort)
  291. port, err = natd.AddPortMapping(ctx, TCP, intPort, extPort, name, leaseTime)
  292. if err == nil {
  293. extPort = port
  294. goto findIP
  295. }
  296. l.Debugln("Error getting new lease on", natd.ID(), err)
  297. }
  298. return Address{}, err
  299. findIP:
  300. ip, err := natd.GetExternalIPAddress(ctx)
  301. if err != nil {
  302. l.Debugln("Error getting external ip on", natd.ID(), err)
  303. ip = nil
  304. }
  305. return Address{
  306. IP: ip,
  307. Port: extPort,
  308. }, nil
  309. }
  310. func (s *Service) String() string {
  311. return fmt.Sprintf("nat.Service@%p", s)
  312. }
  313. func hash(input string) int64 {
  314. h := fnv.New64a()
  315. h.Write([]byte(input))
  316. return int64(h.Sum64())
  317. }