tcp_listen.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "net"
  11. "net/url"
  12. "sync"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/connections/registry"
  16. "github.com/syncthing/syncthing/lib/dialer"
  17. "github.com/syncthing/syncthing/lib/nat"
  18. "github.com/syncthing/syncthing/lib/util"
  19. )
  20. func init() {
  21. factory := &tcpListenerFactory{}
  22. for _, scheme := range []string{"tcp", "tcp4", "tcp6"} {
  23. listeners[scheme] = factory
  24. }
  25. }
  26. type tcpListener struct {
  27. util.ServiceWithError
  28. onAddressesChangedNotifier
  29. uri *url.URL
  30. cfg config.Wrapper
  31. tlsCfg *tls.Config
  32. conns chan internalConn
  33. factory listenerFactory
  34. natService *nat.Service
  35. mapping *nat.Mapping
  36. mut sync.RWMutex
  37. }
  38. func (t *tcpListener) serve(ctx context.Context) error {
  39. tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host)
  40. if err != nil {
  41. l.Infoln("Listen (BEP/tcp):", err)
  42. return err
  43. }
  44. lc := net.ListenConfig{
  45. Control: dialer.ReusePortControl,
  46. }
  47. listener, err := lc.Listen(context.TODO(), t.uri.Scheme, tcaddr.String())
  48. if err != nil {
  49. l.Infoln("Listen (BEP/tcp):", err)
  50. return err
  51. }
  52. t.notifyAddressesChanged(t)
  53. registry.Register(t.uri.Scheme, tcaddr)
  54. defer listener.Close()
  55. defer t.clearAddresses(t)
  56. defer registry.Unregister(t.uri.Scheme, tcaddr)
  57. l.Infof("TCP listener (%v) starting", listener.Addr())
  58. defer l.Infof("TCP listener (%v) shutting down", listener.Addr())
  59. mapping := t.natService.NewMapping(nat.TCP, tcaddr.IP, tcaddr.Port)
  60. mapping.OnChanged(func(_ *nat.Mapping, _, _ []nat.Address) {
  61. t.notifyAddressesChanged(t)
  62. })
  63. defer t.natService.RemoveMapping(mapping)
  64. t.mut.Lock()
  65. t.mapping = mapping
  66. t.mut.Unlock()
  67. acceptFailures := 0
  68. const maxAcceptFailures = 10
  69. // :(, but what can you do.
  70. tcpListener := listener.(*net.TCPListener)
  71. for {
  72. _ = tcpListener.SetDeadline(time.Now().Add(time.Second))
  73. conn, err := tcpListener.Accept()
  74. select {
  75. case <-ctx.Done():
  76. if err == nil {
  77. conn.Close()
  78. }
  79. t.mut.Lock()
  80. t.mapping = nil
  81. t.mut.Unlock()
  82. return nil
  83. default:
  84. }
  85. if err != nil {
  86. if err, ok := err.(*net.OpError); !ok || !err.Timeout() {
  87. l.Warnln("Listen (BEP/tcp): Accepting connection:", err)
  88. acceptFailures++
  89. if acceptFailures > maxAcceptFailures {
  90. // Return to restart the listener, because something
  91. // seems permanently damaged.
  92. return err
  93. }
  94. // Slightly increased delay for each failure.
  95. time.Sleep(time.Duration(acceptFailures) * time.Second)
  96. }
  97. continue
  98. }
  99. acceptFailures = 0
  100. l.Debugln("Listen (BEP/tcp): connect from", conn.RemoteAddr())
  101. if err := dialer.SetTCPOptions(conn); err != nil {
  102. l.Debugln("Listen (BEP/tcp): setting tcp options:", err)
  103. }
  104. if tc := t.cfg.Options().TrafficClass; tc != 0 {
  105. if err := dialer.SetTrafficClass(conn, tc); err != nil {
  106. l.Debugln("Listen (BEP/tcp): setting traffic class:", err)
  107. }
  108. }
  109. tc := tls.Server(conn, t.tlsCfg)
  110. if err := tlsTimedHandshake(tc); err != nil {
  111. l.Infoln("Listen (BEP/tcp): TLS handshake:", err)
  112. tc.Close()
  113. continue
  114. }
  115. t.conns <- internalConn{tc, connTypeTCPServer, tcpPriority}
  116. }
  117. }
  118. func (t *tcpListener) URI() *url.URL {
  119. return t.uri
  120. }
  121. func (t *tcpListener) WANAddresses() []*url.URL {
  122. uris := []*url.URL{t.uri}
  123. t.mut.RLock()
  124. if t.mapping != nil {
  125. addrs := t.mapping.ExternalAddresses()
  126. for _, addr := range addrs {
  127. uri := *t.uri
  128. // Does net.JoinHostPort internally
  129. uri.Host = addr.String()
  130. uris = append(uris, &uri)
  131. // For every address with a specified IP, add one without an IP,
  132. // just in case the specified IP is still internal (router behind DMZ).
  133. if len(addr.IP) != 0 && !addr.IP.IsUnspecified() {
  134. uri = *t.uri
  135. addr.IP = nil
  136. uri.Host = addr.String()
  137. uris = append(uris, &uri)
  138. }
  139. }
  140. }
  141. t.mut.RUnlock()
  142. // If we support ReusePort, add an unspecified zero port address, which will be resolved by the discovery server
  143. // in hopes that TCP punch through works.
  144. if dialer.SupportsReusePort {
  145. uri := *t.uri
  146. uri.Host = "0.0.0.0:0"
  147. uris = append([]*url.URL{&uri}, uris...)
  148. }
  149. return uris
  150. }
  151. func (t *tcpListener) LANAddresses() []*url.URL {
  152. addrs := []*url.URL{t.uri}
  153. addrs = append(addrs, getURLsForAllAdaptersIfUnspecified(t.uri.Scheme, t.uri)...)
  154. return addrs
  155. }
  156. func (t *tcpListener) String() string {
  157. return t.uri.String()
  158. }
  159. func (t *tcpListener) Factory() listenerFactory {
  160. return t.factory
  161. }
  162. func (t *tcpListener) NATType() string {
  163. return "unknown"
  164. }
  165. type tcpListenerFactory struct{}
  166. func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
  167. l := &tcpListener{
  168. uri: fixupPort(uri, config.DefaultTCPPort),
  169. cfg: cfg,
  170. tlsCfg: tlsCfg,
  171. conns: conns,
  172. natService: natService,
  173. factory: f,
  174. }
  175. l.ServiceWithError = util.AsService(l.serve, l.String())
  176. return l
  177. }
  178. func (tcpListenerFactory) Valid(_ config.Configuration) error {
  179. // Always valid
  180. return nil
  181. }