123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- // Copyright (C) 2014 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package discover
- import (
- "context"
- "encoding/binary"
- "encoding/hex"
- "fmt"
- "io"
- "net"
- "net/url"
- "strconv"
- "time"
- "github.com/syncthing/syncthing/lib/beacon"
- "github.com/syncthing/syncthing/lib/events"
- "github.com/syncthing/syncthing/lib/protocol"
- "github.com/syncthing/syncthing/lib/rand"
- "github.com/syncthing/syncthing/lib/svcutil"
- "github.com/thejerf/suture/v4"
- )
- type localClient struct {
- *suture.Supervisor
- myID protocol.DeviceID
- addrList AddressLister
- name string
- evLogger events.Logger
- beacon beacon.Interface
- localBcastStart time.Time
- localBcastTick <-chan time.Time
- forcedBcastTick chan time.Time
- *cache
- }
- const (
- BroadcastInterval = 30 * time.Second
- CacheLifeTime = 3 * BroadcastInterval
- Magic = uint32(0x2EA7D90B) // same as in BEP
- v13Magic = uint32(0x7D79BC40) // previous version
- )
- func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
- c := &localClient{
- Supervisor: suture.New("local", svcutil.SpecWithDebugLogger(l)),
- myID: id,
- addrList: addrList,
- evLogger: evLogger,
- localBcastTick: time.NewTicker(BroadcastInterval).C,
- forcedBcastTick: make(chan time.Time),
- localBcastStart: time.Now(),
- cache: newCache(),
- }
- host, port, err := net.SplitHostPort(addr)
- if err != nil {
- return nil, err
- }
- if len(host) == 0 {
- // A broadcast client
- c.name = "IPv4 local"
- bcPort, err := strconv.Atoi(port)
- if err != nil {
- return nil, err
- }
- c.beacon = beacon.NewBroadcast(bcPort)
- } else {
- // A multicast client
- c.name = "IPv6 local"
- c.beacon = beacon.NewMulticast(addr)
- }
- c.Add(c.beacon)
- c.Add(svcutil.AsService(c.recvAnnouncements, fmt.Sprintf("%s/recv", c)))
- c.Add(svcutil.AsService(c.sendLocalAnnouncements, fmt.Sprintf("%s/sendLocal", c)))
- return c, nil
- }
- // Lookup returns a list of addresses the device is available at.
- func (c *localClient) Lookup(_ context.Context, device protocol.DeviceID) (addresses []string, err error) {
- if cache, ok := c.Get(device); ok {
- if time.Since(cache.when) < CacheLifeTime {
- addresses = cache.Addresses
- }
- }
- return
- }
- func (c *localClient) String() string {
- return c.name
- }
- func (c *localClient) Error() error {
- return c.beacon.Error()
- }
- // announcementPkt appends the local discovery packet to send to msg. Returns
- // true if the packet should be sent, false if there is nothing useful to
- // send.
- func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, bool) {
- addrs := c.addrList.AllAddresses()
- if len(addrs) == 0 {
- // Nothing to announce
- return msg, false
- }
- if cap(msg) >= 4 {
- msg = msg[:4]
- } else {
- msg = make([]byte, 4)
- }
- binary.BigEndian.PutUint32(msg, Magic)
- pkt := Announce{
- ID: c.myID,
- Addresses: addrs,
- InstanceID: instanceID,
- }
- bs, _ := pkt.Marshal()
- msg = append(msg, bs...)
- return msg, true
- }
- func (c *localClient) sendLocalAnnouncements(ctx context.Context) error {
- var msg []byte
- var ok bool
- instanceID := rand.Int63()
- for {
- if msg, ok = c.announcementPkt(instanceID, msg[:0]); ok {
- c.beacon.Send(msg)
- }
- select {
- case <-c.localBcastTick:
- case <-c.forcedBcastTick:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- func (c *localClient) recvAnnouncements(ctx context.Context) error {
- b := c.beacon
- warnedAbout := make(map[string]bool)
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
- buf, addr := b.Recv()
- if addr == nil {
- continue
- }
- if len(buf) < 4 {
- l.Debugf("discover: short packet from %s", addr.String())
- continue
- }
- magic := binary.BigEndian.Uint32(buf)
- switch magic {
- case Magic:
- // All good
- case v13Magic:
- // Old version
- if !warnedAbout[addr.String()] {
- l.Warnf("Incompatible (v0.13) local discovery packet from %v - upgrade that device to connect", addr)
- warnedAbout[addr.String()] = true
- }
- continue
- default:
- l.Debugf("discover: Incorrect magic %x from %s", magic, addr)
- continue
- }
- var pkt Announce
- err := pkt.Unmarshal(buf[4:])
- if err != nil && err != io.EOF {
- l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
- continue
- }
- l.Debugf("discover: Received local announcement from %s for %s", addr, pkt.ID)
- var newDevice bool
- if pkt.ID != c.myID {
- newDevice = c.registerDevice(addr, pkt)
- }
- if newDevice {
- // Force a transmit to announce ourselves, if we are ready to do
- // so right away.
- select {
- case c.forcedBcastTick <- time.Now():
- default:
- }
- }
- }
- }
- func (c *localClient) registerDevice(src net.Addr, device Announce) bool {
- // Remember whether we already had a valid cache entry for this device.
- // If the instance ID has changed the remote device has restarted since
- // we last heard from it, so we should treat it as a new device.
- ce, existsAlready := c.Get(device.ID)
- isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime || ce.instanceID != device.InstanceID
- // Any empty or unspecified addresses should be set to the source address
- // of the announcement. We also skip any addresses we can't parse.
- l.Debugln("discover: Registering addresses for", device.ID)
- var validAddresses []string
- for _, addr := range device.Addresses {
- u, err := url.Parse(addr)
- if err != nil {
- continue
- }
- tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
- if err != nil {
- continue
- }
- if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
- srcAddr, err := net.ResolveTCPAddr("tcp", src.String())
- if err != nil {
- continue
- }
- // Do not use IPv6 source address if requested scheme is tcp4
- if u.Scheme == "tcp4" && srcAddr.IP.To4() == nil {
- continue
- }
- // Do not use IPv4 source address if requested scheme is tcp6
- if u.Scheme == "tcp6" && srcAddr.IP.To4() != nil {
- continue
- }
- host, _, err := net.SplitHostPort(src.String())
- if err != nil {
- continue
- }
- u.Host = net.JoinHostPort(host, strconv.Itoa(tcpAddr.Port))
- l.Debugf("discover: Reconstructed URL is %#v", u)
- validAddresses = append(validAddresses, u.String())
- l.Debugf("discover: Replaced address %v in %s to get %s", tcpAddr.IP, addr, u.String())
- } else {
- validAddresses = append(validAddresses, addr)
- l.Debugf("discover: Accepted address %s verbatim", addr)
- }
- }
- c.Set(device.ID, CacheEntry{
- Addresses: validAddresses,
- when: time.Now(),
- found: true,
- instanceID: device.InstanceID,
- })
- if isNewDevice {
- c.evLogger.Log(events.DeviceDiscovered, map[string]interface{}{
- "device": device.ID.String(),
- "addrs": validAddresses,
- })
- }
- return isNewDevice
- }
|