global.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/tls"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "net"
  16. "net/http"
  17. "net/url"
  18. "strconv"
  19. stdsync "sync"
  20. "time"
  21. "github.com/syncthing/syncthing/lib/connections/registry"
  22. "github.com/syncthing/syncthing/lib/dialer"
  23. "github.com/syncthing/syncthing/lib/events"
  24. "github.com/syncthing/syncthing/lib/protocol"
  25. "golang.org/x/net/http2"
  26. )
  27. type globalClient struct {
  28. server string
  29. addrList AddressLister
  30. announceClient httpClient
  31. queryClient httpClient
  32. noAnnounce bool
  33. noLookup bool
  34. evLogger events.Logger
  35. errorHolder
  36. }
  37. type httpClient interface {
  38. Get(ctx context.Context, url string) (*http.Response, error)
  39. Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error)
  40. }
  41. const (
  42. defaultReannounceInterval = 30 * time.Minute
  43. announceErrorRetryInterval = 5 * time.Minute
  44. requestTimeout = 30 * time.Second
  45. maxAddressChangesBetweenAnnouncements = 10
  46. )
  47. type announcement struct {
  48. Addresses []string `json:"addresses"`
  49. }
  50. func (a announcement) MarshalJSON() ([]byte, error) {
  51. type announcementCopy announcement
  52. a.Addresses = sanitizeRelayAddresses(a.Addresses)
  53. aCopy := announcementCopy(a)
  54. return json.Marshal(aCopy)
  55. }
  56. type serverOptions struct {
  57. insecure bool // don't check certificate
  58. noAnnounce bool // don't announce
  59. noLookup bool // don't use for lookups
  60. id string // expected server device ID
  61. }
  62. // A lookupError is any other error but with a cache validity time attached.
  63. type lookupError struct {
  64. msg string
  65. cacheFor time.Duration
  66. }
  67. func (e *lookupError) Error() string { return e.msg }
  68. func (e *lookupError) CacheFor() time.Duration {
  69. return e.cacheFor
  70. }
  71. func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger, registry *registry.Registry) (FinderService, error) {
  72. server, opts, err := parseOptions(server)
  73. if err != nil {
  74. return nil, err
  75. }
  76. var devID protocol.DeviceID
  77. if opts.id != "" {
  78. devID, err = protocol.DeviceIDFromString(opts.id)
  79. if err != nil {
  80. return nil, err
  81. }
  82. }
  83. // The http.Client used for announcements. It needs to have our
  84. // certificate to prove our identity, and may or may not verify the server
  85. // certificate depending on the insecure setting.
  86. var dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
  87. if registry != nil {
  88. dialContext = dialer.DialContextReusePortFunc(registry)
  89. } else {
  90. dialContext = dialer.DialContext
  91. }
  92. var announceClient httpClient = &contextClient{&http.Client{
  93. Timeout: requestTimeout,
  94. Transport: http2EnabledTransport(&http.Transport{
  95. DialContext: dialContext,
  96. Proxy: http.ProxyFromEnvironment,
  97. DisableKeepAlives: true, // announcements are few and far between, so don't keep the connection open
  98. TLSClientConfig: &tls.Config{
  99. InsecureSkipVerify: opts.insecure,
  100. Certificates: []tls.Certificate{cert},
  101. MinVersion: tls.VersionTLS12,
  102. },
  103. }),
  104. }}
  105. if opts.id != "" {
  106. announceClient = newIDCheckingHTTPClient(announceClient, devID)
  107. }
  108. // The http.Client used for queries. We don't need to present our
  109. // certificate here, so lets not include it. May be insecure if requested.
  110. var queryClient httpClient = &contextClient{&http.Client{
  111. Timeout: requestTimeout,
  112. Transport: http2EnabledTransport(&http.Transport{
  113. DialContext: dialer.DialContext,
  114. Proxy: http.ProxyFromEnvironment,
  115. IdleConnTimeout: time.Second,
  116. TLSClientConfig: &tls.Config{
  117. InsecureSkipVerify: opts.insecure,
  118. MinVersion: tls.VersionTLS12,
  119. },
  120. }),
  121. }}
  122. if opts.id != "" {
  123. queryClient = newIDCheckingHTTPClient(queryClient, devID)
  124. }
  125. cl := &globalClient{
  126. server: server,
  127. addrList: addrList,
  128. announceClient: announceClient,
  129. queryClient: queryClient,
  130. noAnnounce: opts.noAnnounce,
  131. noLookup: opts.noLookup,
  132. evLogger: evLogger,
  133. }
  134. if !opts.noAnnounce {
  135. // If we are supposed to announce, it's an error until we've done so.
  136. cl.setError(errors.New("not announced"))
  137. }
  138. return cl, nil
  139. }
  140. // Lookup returns the list of addresses where the given device is available
  141. func (c *globalClient) Lookup(ctx context.Context, device protocol.DeviceID) (addresses []string, err error) {
  142. if c.noLookup {
  143. return nil, &lookupError{
  144. msg: "lookups not supported",
  145. cacheFor: time.Hour,
  146. }
  147. }
  148. qURL, err := url.Parse(c.server)
  149. if err != nil {
  150. return nil, err
  151. }
  152. q := qURL.Query()
  153. q.Set("device", device.String())
  154. qURL.RawQuery = q.Encode()
  155. resp, err := c.queryClient.Get(ctx, qURL.String())
  156. if err != nil {
  157. l.Debugln("globalClient.Lookup", qURL, err)
  158. return nil, err
  159. }
  160. if resp.StatusCode != http.StatusOK {
  161. resp.Body.Close()
  162. l.Debugln("globalClient.Lookup", qURL, resp.Status)
  163. err := errors.New(resp.Status)
  164. if secs, atoiErr := strconv.Atoi(resp.Header.Get("Retry-After")); atoiErr == nil && secs > 0 {
  165. err = &lookupError{
  166. msg: resp.Status,
  167. cacheFor: time.Duration(secs) * time.Second,
  168. }
  169. }
  170. return nil, err
  171. }
  172. bs, err := io.ReadAll(resp.Body)
  173. if err != nil {
  174. return nil, err
  175. }
  176. resp.Body.Close()
  177. var ann announcement
  178. err = json.Unmarshal(bs, &ann)
  179. return ann.Addresses, err
  180. }
  181. func (c *globalClient) String() string {
  182. return "global@" + c.server
  183. }
  184. func (c *globalClient) Serve(ctx context.Context) error {
  185. if c.noAnnounce {
  186. // We're configured to not do announcements, only lookups. To maintain
  187. // the same interface, we just pause here if Serve() is run.
  188. <-ctx.Done()
  189. return ctx.Err()
  190. }
  191. timer := time.NewTimer(5 * time.Second)
  192. defer timer.Stop()
  193. eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged)
  194. defer eventSub.Unsubscribe()
  195. timerResetCount := 0
  196. for {
  197. select {
  198. case <-eventSub.C():
  199. if timerResetCount < maxAddressChangesBetweenAnnouncements {
  200. // Defer announcement by 2 seconds, essentially debouncing
  201. // if we have a stream of events incoming in quick succession.
  202. timer.Reset(2 * time.Second)
  203. } else if timerResetCount == maxAddressChangesBetweenAnnouncements {
  204. // Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
  205. // so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
  206. l.Warnf("Detected a flip-flopping listener")
  207. c.setError(errors.New("flip flopping listener"))
  208. // Incrementing the count above 10 will prevent us from warning or setting the error again
  209. // It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
  210. timer.Reset(announceErrorRetryInterval)
  211. }
  212. timerResetCount++
  213. case <-timer.C:
  214. timerResetCount = 0
  215. c.sendAnnouncement(ctx, timer)
  216. case <-ctx.Done():
  217. return ctx.Err()
  218. }
  219. }
  220. }
  221. func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer) {
  222. var ann announcement
  223. if c.addrList != nil {
  224. ann.Addresses = c.addrList.ExternalAddresses()
  225. }
  226. if len(ann.Addresses) == 0 {
  227. // There are legitimate cases for not having anything to announce,
  228. // yet still using global discovery for lookups. Do not error out
  229. // here.
  230. c.setError(nil)
  231. timer.Reset(announceErrorRetryInterval)
  232. return
  233. }
  234. // The marshal doesn't fail, I promise.
  235. postData, _ := json.Marshal(ann)
  236. l.Debugf("%s Announcement: %v", c, ann)
  237. resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData))
  238. if err != nil {
  239. l.Debugln(c, "announce POST:", err)
  240. c.setError(err)
  241. timer.Reset(announceErrorRetryInterval)
  242. return
  243. }
  244. l.Debugln(c, "announce POST:", resp.Status)
  245. resp.Body.Close()
  246. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  247. l.Debugln(c, "announce POST:", resp.Status)
  248. c.setError(errors.New(resp.Status))
  249. if h := resp.Header.Get("Retry-After"); h != "" {
  250. // The server has a recommendation on when we should
  251. // retry. Follow it.
  252. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  253. l.Debugln(c, "announce Retry-After:", secs, err)
  254. timer.Reset(time.Duration(secs) * time.Second)
  255. return
  256. }
  257. }
  258. timer.Reset(announceErrorRetryInterval)
  259. return
  260. }
  261. c.setError(nil)
  262. if h := resp.Header.Get("Reannounce-After"); h != "" {
  263. // The server has a recommendation on when we should
  264. // reannounce. Follow it.
  265. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  266. l.Debugln(c, "announce Reannounce-After:", secs, err)
  267. timer.Reset(time.Duration(secs) * time.Second)
  268. return
  269. }
  270. }
  271. timer.Reset(defaultReannounceInterval)
  272. }
  273. func (*globalClient) Cache() map[protocol.DeviceID]CacheEntry {
  274. // The globalClient doesn't do caching
  275. return nil
  276. }
  277. // parseOptions parses and strips away any ?query=val options, setting the
  278. // corresponding field in the serverOptions struct. Unknown query options are
  279. // ignored and removed.
  280. func parseOptions(dsn string) (server string, opts serverOptions, err error) {
  281. p, err := url.Parse(dsn)
  282. if err != nil {
  283. return "", serverOptions{}, err
  284. }
  285. // Grab known options from the query string
  286. q := p.Query()
  287. opts.id = q.Get("id")
  288. opts.insecure = opts.id != "" || queryBool(q, "insecure")
  289. opts.noAnnounce = queryBool(q, "noannounce")
  290. opts.noLookup = queryBool(q, "nolookup")
  291. // Check for disallowed combinations
  292. if p.Scheme == "http" {
  293. if !opts.insecure {
  294. return "", serverOptions{}, errors.New("http without insecure not supported")
  295. }
  296. if !opts.noAnnounce {
  297. return "", serverOptions{}, errors.New("http without noannounce not supported")
  298. }
  299. } else if p.Scheme != "https" {
  300. return "", serverOptions{}, errors.New("unsupported scheme " + p.Scheme)
  301. }
  302. // Remove the query string
  303. p.RawQuery = ""
  304. server = p.String()
  305. return
  306. }
  307. // queryBool returns the query parameter parsed as a boolean. An empty value
  308. // ("?foo") is considered true, as is any value string except false
  309. // ("?foo=false").
  310. func queryBool(q url.Values, key string) bool {
  311. if _, ok := q[key]; !ok {
  312. return false
  313. }
  314. return q.Get(key) != "false"
  315. }
  316. type idCheckingHTTPClient struct {
  317. httpClient
  318. id protocol.DeviceID
  319. }
  320. func newIDCheckingHTTPClient(client httpClient, id protocol.DeviceID) *idCheckingHTTPClient {
  321. return &idCheckingHTTPClient{
  322. httpClient: client,
  323. id: id,
  324. }
  325. }
  326. func (c *idCheckingHTTPClient) check(resp *http.Response) error {
  327. if resp.TLS == nil {
  328. return errors.New("security: not TLS")
  329. }
  330. if len(resp.TLS.PeerCertificates) == 0 {
  331. return errors.New("security: no certificates")
  332. }
  333. id := protocol.NewDeviceID(resp.TLS.PeerCertificates[0].Raw)
  334. if !id.Equals(c.id) {
  335. return errors.New("security: incorrect device id")
  336. }
  337. return nil
  338. }
  339. func (c *idCheckingHTTPClient) Get(ctx context.Context, url string) (*http.Response, error) {
  340. resp, err := c.httpClient.Get(ctx, url)
  341. if err != nil {
  342. return nil, err
  343. }
  344. if err := c.check(resp); err != nil {
  345. return nil, err
  346. }
  347. return resp, nil
  348. }
  349. func (c *idCheckingHTTPClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  350. resp, err := c.httpClient.Post(ctx, url, ctype, data)
  351. if err != nil {
  352. return nil, err
  353. }
  354. if err := c.check(resp); err != nil {
  355. return nil, err
  356. }
  357. return resp, nil
  358. }
  359. type errorHolder struct {
  360. err error
  361. mut stdsync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
  362. }
  363. func (e *errorHolder) setError(err error) {
  364. e.mut.Lock()
  365. e.err = err
  366. e.mut.Unlock()
  367. }
  368. func (e *errorHolder) Error() error {
  369. e.mut.Lock()
  370. err := e.err
  371. e.mut.Unlock()
  372. return err
  373. }
  374. type contextClient struct {
  375. *http.Client
  376. }
  377. func (c *contextClient) Get(ctx context.Context, url string) (*http.Response, error) {
  378. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  379. if err != nil {
  380. return nil, err
  381. }
  382. return c.Client.Do(req)
  383. }
  384. func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  385. req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
  386. if err != nil {
  387. return nil, err
  388. }
  389. req.Header.Set("Content-Type", ctype)
  390. return c.Client.Do(req)
  391. }
  392. func globalDiscoveryIdentity(addr string) string {
  393. return "global discovery server " + addr
  394. }
  395. func ipv4Identity(port int) string {
  396. return fmt.Sprintf("IPv4 local broadcast discovery on port %d", port)
  397. }
  398. func ipv6Identity(addr string) string {
  399. return fmt.Sprintf("IPv6 local multicast discovery on address %s", addr)
  400. }
  401. func http2EnabledTransport(t *http.Transport) *http.Transport {
  402. _ = http2.ConfigureTransport(t)
  403. return t
  404. }