sync.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package eth
  17. import (
  18. "math/rand"
  19. "sync/atomic"
  20. "time"
  21. "github.com/ethereum/go-ethereum/common"
  22. "github.com/ethereum/go-ethereum/core/types"
  23. "github.com/ethereum/go-ethereum/eth/downloader"
  24. "github.com/ethereum/go-ethereum/log"
  25. "github.com/ethereum/go-ethereum/p2p/discover"
  26. )
  27. const (
  28. forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
  29. minDesiredPeerCount = 5 // Amount of peers desired to start syncing
  30. // This is the target size for the packs of transactions sent by txsyncLoop.
  31. // A pack can get larger than this if a single transactions exceeds this size.
  32. txsyncPackSize = 100 * 1024
  33. )
  34. type txsync struct {
  35. p *peer
  36. txs []*types.Transaction
  37. }
  38. // syncTransactions starts sending all currently pending transactions to the given peer.
  39. func (pm *ProtocolManager) syncTransactions(p *peer) {
  40. var txs types.Transactions
  41. pending, _ := pm.txpool.Pending()
  42. for _, batch := range pending {
  43. txs = append(txs, batch...)
  44. }
  45. if len(txs) == 0 {
  46. return
  47. }
  48. select {
  49. case pm.txsyncCh <- &txsync{p, txs}:
  50. case <-pm.quitSync:
  51. }
  52. }
  53. // txsyncLoop takes care of the initial transaction sync for each new
  54. // connection. When a new peer appears, we relay all currently pending
  55. // transactions. In order to minimise egress bandwidth usage, we send
  56. // the transactions in small packs to one peer at a time.
  57. func (pm *ProtocolManager) txsyncLoop() {
  58. var (
  59. pending = make(map[discover.NodeID]*txsync)
  60. sending = false // whether a send is active
  61. pack = new(txsync) // the pack that is being sent
  62. done = make(chan error, 1) // result of the send
  63. )
  64. // send starts a sending a pack of transactions from the sync.
  65. send := func(s *txsync) {
  66. // Fill pack with transactions up to the target size.
  67. size := common.StorageSize(0)
  68. pack.p = s.p
  69. pack.txs = pack.txs[:0]
  70. for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
  71. pack.txs = append(pack.txs, s.txs[i])
  72. size += s.txs[i].Size()
  73. }
  74. // Remove the transactions that will be sent.
  75. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
  76. if len(s.txs) == 0 {
  77. delete(pending, s.p.ID())
  78. }
  79. // Send the pack in the background.
  80. s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
  81. sending = true
  82. go func() { done <- pack.p.SendTransactions(pack.txs) }()
  83. }
  84. // pick chooses the next pending sync.
  85. pick := func() *txsync {
  86. if len(pending) == 0 {
  87. return nil
  88. }
  89. n := rand.Intn(len(pending)) + 1
  90. for _, s := range pending {
  91. if n--; n == 0 {
  92. return s
  93. }
  94. }
  95. return nil
  96. }
  97. for {
  98. select {
  99. case s := <-pm.txsyncCh:
  100. pending[s.p.ID()] = s
  101. if !sending {
  102. send(s)
  103. }
  104. case err := <-done:
  105. sending = false
  106. // Stop tracking peers that cause send failures.
  107. if err != nil {
  108. pack.p.Log().Debug("Transaction send failed", "err", err)
  109. delete(pending, pack.p.ID())
  110. }
  111. // Schedule the next send.
  112. if s := pick(); s != nil {
  113. send(s)
  114. }
  115. case <-pm.quitSync:
  116. return
  117. }
  118. }
  119. }
  120. // syncer is responsible for periodically synchronising with the network, both
  121. // downloading hashes and blocks as well as handling the announcement handler.
  122. func (pm *ProtocolManager) syncer() {
  123. // Start and ensure cleanup of sync mechanisms
  124. pm.fetcher.Start()
  125. defer pm.fetcher.Stop()
  126. defer pm.downloader.Terminate()
  127. // Wait for different events to fire synchronisation operations
  128. forceSync := time.NewTicker(forceSyncCycle)
  129. defer forceSync.Stop()
  130. for {
  131. select {
  132. case <-pm.newPeerCh:
  133. // Make sure we have peers to select from, then sync
  134. if pm.peers.Len() < minDesiredPeerCount {
  135. break
  136. }
  137. go pm.synchronise(pm.peers.BestPeer())
  138. case <-forceSync.C:
  139. // Force a sync even if not enough peers are present
  140. go pm.synchronise(pm.peers.BestPeer())
  141. case <-pm.noMorePeers:
  142. return
  143. }
  144. }
  145. }
  146. // synchronise tries to sync up our local block chain with a remote peer.
  147. func (pm *ProtocolManager) synchronise(peer *peer) {
  148. // Short circuit if no peers are available
  149. if peer == nil {
  150. return
  151. }
  152. // Make sure the peer's TD is higher than our own
  153. currentBlock := pm.blockchain.CurrentBlock()
  154. td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
  155. pHead, pTd := peer.Head()
  156. if pTd.Cmp(td) <= 0 {
  157. return
  158. }
  159. // Otherwise try to sync with the downloader
  160. mode := downloader.FullSync
  161. if atomic.LoadUint32(&pm.fastSync) == 1 {
  162. // Fast sync was explicitly requested, and explicitly granted
  163. mode = downloader.FastSync
  164. } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
  165. // The database seems empty as the current block is the genesis. Yet the fast
  166. // block is ahead, so fast sync was enabled for this node at a certain point.
  167. // The only scenario where this can happen is if the user manually (or via a
  168. // bad block) rolled back a fast sync node below the sync point. In this case
  169. // however it's safe to reenable fast sync.
  170. atomic.StoreUint32(&pm.fastSync, 1)
  171. mode = downloader.FastSync
  172. }
  173. if mode == downloader.FastSync {
  174. // Make sure the peer's total difficulty we are synchronizing is higher.
  175. if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
  176. return
  177. }
  178. }
  179. // Run the sync cycle, and disable fast sync if we've went past the pivot block
  180. if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
  181. return
  182. }
  183. if atomic.LoadUint32(&pm.fastSync) == 1 {
  184. log.Info("Fast sync complete, auto disabling")
  185. atomic.StoreUint32(&pm.fastSync, 0)
  186. }
  187. atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
  188. if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
  189. // We've completed a sync cycle, notify all peers of new state. This path is
  190. // essential in star-topology networks where a gateway node needs to notify
  191. // all its out-of-date peers of the availability of a new block. This failure
  192. // scenario will most often crop up in private and hackathon networks with
  193. // degenerate connectivity, but it should be healthy for the mainnet too to
  194. // more reliably update peers or the local TD state.
  195. go pm.BroadcastBlock(head, false)
  196. }
  197. }