replication.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "encoding/binary"
  11. "fmt"
  12. io "io"
  13. "log"
  14. "net"
  15. "time"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. )
  18. const (
  19. replicationReadTimeout = time.Minute
  20. replicationWriteTimeout = 30 * time.Second
  21. replicationHeartbeatInterval = time.Second * 30
  22. )
  23. type replicator interface {
  24. send(key string, addrs []DatabaseAddress, seen int64)
  25. }
  26. // a replicationSender tries to connect to the remote address and provide
  27. // them with a feed of replication updates.
  28. type replicationSender struct {
  29. dst string
  30. cert tls.Certificate // our certificate
  31. allowedIDs []protocol.DeviceID
  32. outbox chan ReplicationRecord
  33. }
  34. func newReplicationSender(dst string, cert tls.Certificate, allowedIDs []protocol.DeviceID) *replicationSender {
  35. return &replicationSender{
  36. dst: dst,
  37. cert: cert,
  38. allowedIDs: allowedIDs,
  39. outbox: make(chan ReplicationRecord, replicationOutboxSize),
  40. }
  41. }
  42. func (s *replicationSender) Serve(ctx context.Context) error {
  43. // Sleep a little at startup. Peers often restart at the same time, and
  44. // this avoid the service failing and entering backoff state
  45. // unnecessarily, while also reducing the reconnect rate to something
  46. // reasonable by default.
  47. time.Sleep(2 * time.Second)
  48. tlsCfg := &tls.Config{
  49. Certificates: []tls.Certificate{s.cert},
  50. MinVersion: tls.VersionTLS12,
  51. InsecureSkipVerify: true,
  52. }
  53. // Dial the TLS connection.
  54. conn, err := tls.Dial("tcp", s.dst, tlsCfg)
  55. if err != nil {
  56. log.Println("Replication connect:", err)
  57. return err
  58. }
  59. defer func() {
  60. conn.SetWriteDeadline(time.Now().Add(time.Second))
  61. conn.Close()
  62. }()
  63. // The replication stream is not especially latency sensitive, but it is
  64. // quite a lot of data in small writes. Make it more efficient.
  65. if tcpc, ok := conn.NetConn().(*net.TCPConn); ok {
  66. _ = tcpc.SetNoDelay(false)
  67. }
  68. // Get the other side device ID.
  69. remoteID, err := deviceID(conn)
  70. if err != nil {
  71. log.Println("Replication connect:", err)
  72. return err
  73. }
  74. // Verify it's in the set of allowed device IDs.
  75. if !deviceIDIn(remoteID, s.allowedIDs) {
  76. log.Println("Replication connect: unexpected device ID:", remoteID)
  77. return err
  78. }
  79. heartBeatTicker := time.NewTicker(replicationHeartbeatInterval)
  80. defer heartBeatTicker.Stop()
  81. // Send records.
  82. buf := make([]byte, 1024)
  83. for {
  84. select {
  85. case <-heartBeatTicker.C:
  86. if len(s.outbox) > 0 {
  87. // No need to send heartbeats if there are events/prevrious
  88. // heartbeats to send, they will keep the connection alive.
  89. continue
  90. }
  91. // Empty replication message is the heartbeat:
  92. s.outbox <- ReplicationRecord{}
  93. case rec := <-s.outbox:
  94. // Buffer must hold record plus four bytes for size
  95. size := rec.Size()
  96. if len(buf) < size+4 {
  97. buf = make([]byte, size+4)
  98. }
  99. // Record comes after the four bytes size
  100. n, err := rec.MarshalTo(buf[4:])
  101. if err != nil {
  102. // odd to get an error here, but we haven't sent anything
  103. // yet so it's not fatal
  104. replicationSendsTotal.WithLabelValues("error").Inc()
  105. log.Println("Replication marshal:", err)
  106. continue
  107. }
  108. binary.BigEndian.PutUint32(buf, uint32(n))
  109. // Send
  110. conn.SetWriteDeadline(time.Now().Add(replicationWriteTimeout))
  111. if _, err := conn.Write(buf[:4+n]); err != nil {
  112. replicationSendsTotal.WithLabelValues("error").Inc()
  113. log.Println("Replication write:", err)
  114. // Yes, we are losing the replication event here.
  115. return err
  116. }
  117. replicationSendsTotal.WithLabelValues("success").Inc()
  118. case <-ctx.Done():
  119. return nil
  120. }
  121. }
  122. }
  123. func (s *replicationSender) String() string {
  124. return fmt.Sprintf("replicationSender(%q)", s.dst)
  125. }
  126. func (s *replicationSender) send(key string, ps []DatabaseAddress, _ int64) {
  127. item := ReplicationRecord{
  128. Key: key,
  129. Addresses: ps,
  130. }
  131. // The send should never block. The inbox is suitably buffered for at
  132. // least a few seconds of stalls, which shouldn't happen in practice.
  133. select {
  134. case s.outbox <- item:
  135. default:
  136. replicationSendsTotal.WithLabelValues("drop").Inc()
  137. }
  138. }
  139. // a replicationMultiplexer sends to multiple replicators
  140. type replicationMultiplexer []replicator
  141. func (m replicationMultiplexer) send(key string, ps []DatabaseAddress, seen int64) {
  142. for _, s := range m {
  143. // each send is nonblocking
  144. s.send(key, ps, seen)
  145. }
  146. }
  147. // replicationListener accepts incoming connections and reads replication
  148. // items from them. Incoming items are applied to the KV store.
  149. type replicationListener struct {
  150. addr string
  151. cert tls.Certificate
  152. allowedIDs []protocol.DeviceID
  153. db database
  154. }
  155. func newReplicationListener(addr string, cert tls.Certificate, allowedIDs []protocol.DeviceID, db database) *replicationListener {
  156. return &replicationListener{
  157. addr: addr,
  158. cert: cert,
  159. allowedIDs: allowedIDs,
  160. db: db,
  161. }
  162. }
  163. func (l *replicationListener) Serve(ctx context.Context) error {
  164. tlsCfg := &tls.Config{
  165. Certificates: []tls.Certificate{l.cert},
  166. ClientAuth: tls.RequestClientCert,
  167. MinVersion: tls.VersionTLS12,
  168. InsecureSkipVerify: true,
  169. }
  170. lst, err := tls.Listen("tcp", l.addr, tlsCfg)
  171. if err != nil {
  172. log.Println("Replication listen:", err)
  173. return err
  174. }
  175. defer lst.Close()
  176. for {
  177. select {
  178. case <-ctx.Done():
  179. return nil
  180. default:
  181. }
  182. // Accept a connection
  183. conn, err := lst.Accept()
  184. if err != nil {
  185. log.Println("Replication accept:", err)
  186. return err
  187. }
  188. // Figure out the other side device ID
  189. remoteID, err := deviceID(conn.(*tls.Conn))
  190. if err != nil {
  191. log.Println("Replication accept:", err)
  192. conn.SetWriteDeadline(time.Now().Add(time.Second))
  193. conn.Close()
  194. continue
  195. }
  196. // Verify it is in the set of allowed device IDs
  197. if !deviceIDIn(remoteID, l.allowedIDs) {
  198. log.Println("Replication accept: unexpected device ID:", remoteID)
  199. conn.SetWriteDeadline(time.Now().Add(time.Second))
  200. conn.Close()
  201. continue
  202. }
  203. go l.handle(ctx, conn)
  204. }
  205. }
  206. func (l *replicationListener) String() string {
  207. return fmt.Sprintf("replicationListener(%q)", l.addr)
  208. }
  209. func (l *replicationListener) handle(ctx context.Context, conn net.Conn) {
  210. defer func() {
  211. conn.SetWriteDeadline(time.Now().Add(time.Second))
  212. conn.Close()
  213. }()
  214. buf := make([]byte, 1024)
  215. for {
  216. select {
  217. case <-ctx.Done():
  218. return
  219. default:
  220. }
  221. conn.SetReadDeadline(time.Now().Add(replicationReadTimeout))
  222. // First four bytes are the size
  223. if _, err := io.ReadFull(conn, buf[:4]); err != nil {
  224. log.Println("Replication read size:", err)
  225. replicationRecvsTotal.WithLabelValues("error").Inc()
  226. return
  227. }
  228. // Read the rest of the record
  229. size := int(binary.BigEndian.Uint32(buf[:4]))
  230. if len(buf) < size {
  231. buf = make([]byte, size)
  232. }
  233. if size == 0 {
  234. // Heartbeat, ignore
  235. continue
  236. }
  237. if _, err := io.ReadFull(conn, buf[:size]); err != nil {
  238. log.Println("Replication read record:", err)
  239. replicationRecvsTotal.WithLabelValues("error").Inc()
  240. return
  241. }
  242. // Unmarshal
  243. var rec ReplicationRecord
  244. if err := rec.Unmarshal(buf[:size]); err != nil {
  245. log.Println("Replication unmarshal:", err)
  246. replicationRecvsTotal.WithLabelValues("error").Inc()
  247. continue
  248. }
  249. // Store
  250. l.db.merge(rec.Key, rec.Addresses, rec.Seen)
  251. replicationRecvsTotal.WithLabelValues("success").Inc()
  252. }
  253. }
  254. func deviceID(conn *tls.Conn) (protocol.DeviceID, error) {
  255. // Handshake may not be complete on the server side yet, which we need
  256. // to get the client certificate.
  257. if !conn.ConnectionState().HandshakeComplete {
  258. if err := conn.Handshake(); err != nil {
  259. return protocol.DeviceID{}, err
  260. }
  261. }
  262. // We expect exactly one certificate.
  263. certs := conn.ConnectionState().PeerCertificates
  264. if len(certs) != 1 {
  265. return protocol.DeviceID{}, fmt.Errorf("unexpected number of certificates (%d != 1)", len(certs))
  266. }
  267. return protocol.NewDeviceID(certs[0].Raw), nil
  268. }
  269. func deviceIDIn(id protocol.DeviceID, ids []protocol.DeviceID) bool {
  270. for _, candidate := range ids {
  271. if id == candidate {
  272. return true
  273. }
  274. }
  275. return false
  276. }