123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- // Copyright (C) 2016 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package connections
- import (
- "context"
- "crypto/tls"
- "net"
- "net/url"
- "sync"
- "time"
- "github.com/syncthing/syncthing/lib/config"
- "github.com/syncthing/syncthing/lib/connections/registry"
- "github.com/syncthing/syncthing/lib/dialer"
- "github.com/syncthing/syncthing/lib/nat"
- "github.com/syncthing/syncthing/lib/util"
- )
- func init() {
- factory := &tcpListenerFactory{}
- for _, scheme := range []string{"tcp", "tcp4", "tcp6"} {
- listeners[scheme] = factory
- }
- }
- type tcpListener struct {
- util.ServiceWithError
- onAddressesChangedNotifier
- uri *url.URL
- cfg config.Wrapper
- tlsCfg *tls.Config
- conns chan internalConn
- factory listenerFactory
- natService *nat.Service
- mapping *nat.Mapping
- mut sync.RWMutex
- }
- func (t *tcpListener) serve(ctx context.Context) error {
- tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host)
- if err != nil {
- l.Infoln("Listen (BEP/tcp):", err)
- return err
- }
- lc := net.ListenConfig{
- Control: dialer.ReusePortControl,
- }
- listener, err := lc.Listen(context.TODO(), t.uri.Scheme, tcaddr.String())
- if err != nil {
- l.Infoln("Listen (BEP/tcp):", err)
- return err
- }
- t.notifyAddressesChanged(t)
- registry.Register(t.uri.Scheme, tcaddr)
- defer listener.Close()
- defer t.clearAddresses(t)
- defer registry.Unregister(t.uri.Scheme, tcaddr)
- l.Infof("TCP listener (%v) starting", listener.Addr())
- defer l.Infof("TCP listener (%v) shutting down", listener.Addr())
- mapping := t.natService.NewMapping(nat.TCP, tcaddr.IP, tcaddr.Port)
- mapping.OnChanged(func(_ *nat.Mapping, _, _ []nat.Address) {
- t.notifyAddressesChanged(t)
- })
- defer t.natService.RemoveMapping(mapping)
- t.mut.Lock()
- t.mapping = mapping
- t.mut.Unlock()
- acceptFailures := 0
- const maxAcceptFailures = 10
- // :(, but what can you do.
- tcpListener := listener.(*net.TCPListener)
- for {
- _ = tcpListener.SetDeadline(time.Now().Add(time.Second))
- conn, err := tcpListener.Accept()
- select {
- case <-ctx.Done():
- if err == nil {
- conn.Close()
- }
- t.mut.Lock()
- t.mapping = nil
- t.mut.Unlock()
- return nil
- default:
- }
- if err != nil {
- if err, ok := err.(*net.OpError); !ok || !err.Timeout() {
- l.Warnln("Listen (BEP/tcp): Accepting connection:", err)
- acceptFailures++
- if acceptFailures > maxAcceptFailures {
- // Return to restart the listener, because something
- // seems permanently damaged.
- return err
- }
- // Slightly increased delay for each failure.
- time.Sleep(time.Duration(acceptFailures) * time.Second)
- }
- continue
- }
- acceptFailures = 0
- l.Debugln("Listen (BEP/tcp): connect from", conn.RemoteAddr())
- if err := dialer.SetTCPOptions(conn); err != nil {
- l.Debugln("Listen (BEP/tcp): setting tcp options:", err)
- }
- if tc := t.cfg.Options().TrafficClass; tc != 0 {
- if err := dialer.SetTrafficClass(conn, tc); err != nil {
- l.Debugln("Listen (BEP/tcp): setting traffic class:", err)
- }
- }
- tc := tls.Server(conn, t.tlsCfg)
- if err := tlsTimedHandshake(tc); err != nil {
- l.Infoln("Listen (BEP/tcp): TLS handshake:", err)
- tc.Close()
- continue
- }
- t.conns <- internalConn{tc, connTypeTCPServer, tcpPriority}
- }
- }
- func (t *tcpListener) URI() *url.URL {
- return t.uri
- }
- func (t *tcpListener) WANAddresses() []*url.URL {
- uris := []*url.URL{t.uri}
- t.mut.RLock()
- if t.mapping != nil {
- addrs := t.mapping.ExternalAddresses()
- for _, addr := range addrs {
- uri := *t.uri
- // Does net.JoinHostPort internally
- uri.Host = addr.String()
- uris = append(uris, &uri)
- // For every address with a specified IP, add one without an IP,
- // just in case the specified IP is still internal (router behind DMZ).
- if len(addr.IP) != 0 && !addr.IP.IsUnspecified() {
- uri = *t.uri
- addr.IP = nil
- uri.Host = addr.String()
- uris = append(uris, &uri)
- }
- }
- }
- t.mut.RUnlock()
- // If we support ReusePort, add an unspecified zero port address, which will be resolved by the discovery server
- // in hopes that TCP punch through works.
- if dialer.SupportsReusePort {
- uri := *t.uri
- uri.Host = "0.0.0.0:0"
- uris = append([]*url.URL{&uri}, uris...)
- }
- return uris
- }
- func (t *tcpListener) LANAddresses() []*url.URL {
- addrs := []*url.URL{t.uri}
- addrs = append(addrs, getURLsForAllAdaptersIfUnspecified(t.uri.Scheme, t.uri)...)
- return addrs
- }
- func (t *tcpListener) String() string {
- return t.uri.String()
- }
- func (t *tcpListener) Factory() listenerFactory {
- return t.factory
- }
- func (t *tcpListener) NATType() string {
- return "unknown"
- }
- type tcpListenerFactory struct{}
- func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
- l := &tcpListener{
- uri: fixupPort(uri, config.DefaultTCPPort),
- cfg: cfg,
- tlsCfg: tlsCfg,
- conns: conns,
- natService: natService,
- factory: f,
- }
- l.ServiceWithError = util.AsService(l.serve, l.String())
- return l
- }
- func (tcpListenerFactory) Valid(_ config.Configuration) error {
- // Always valid
- return nil
- }
|