fetcher.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package fetcher contains the block announcement based synchronisation.
  17. package fetcher
  18. import (
  19. "errors"
  20. "math/rand"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/consensus"
  24. "github.com/ethereum/go-ethereum/core/types"
  25. "github.com/ethereum/go-ethereum/log"
  26. "gopkg.in/karalabe/cookiejar.v2/collections/prque"
  27. )
  28. const (
  29. arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
  30. gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
  31. fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block
  32. maxUncleDist = 7 // Maximum allowed backward distance from the chain head
  33. maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
  34. hashLimit = 256 // Maximum number of unique blocks a peer may have announced
  35. blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
  36. )
  37. var (
  38. errTerminated = errors.New("terminated")
  39. )
  40. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
  41. type blockRetrievalFn func(common.Hash) *types.Block
  42. // headerRequesterFn is a callback type for sending a header retrieval request.
  43. type headerRequesterFn func(common.Hash) error
  44. // bodyRequesterFn is a callback type for sending a body retrieval request.
  45. type bodyRequesterFn func([]common.Hash) error
  46. // headerVerifierFn is a callback type to verify a block's header for fast propagation.
  47. type headerVerifierFn func(header *types.Header) error
  48. // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
  49. type blockBroadcasterFn func(block *types.Block, propagate bool)
  50. // chainHeightFn is a callback type to retrieve the current chain height.
  51. type chainHeightFn func() uint64
  52. // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
  53. type chainInsertFn func(types.Blocks) (int, error)
  54. // peerDropFn is a callback type for dropping a peer detected as malicious.
  55. type peerDropFn func(id string)
  56. // announce is the hash notification of the availability of a new block in the
  57. // network.
  58. type announce struct {
  59. hash common.Hash // Hash of the block being announced
  60. number uint64 // Number of the block being announced (0 = unknown | old protocol)
  61. header *types.Header // Header of the block partially reassembled (new protocol)
  62. time time.Time // Timestamp of the announcement
  63. origin string // Identifier of the peer originating the notification
  64. fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
  65. fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
  66. }
  67. // headerFilterTask represents a batch of headers needing fetcher filtering.
  68. type headerFilterTask struct {
  69. peer string // The source peer of block headers
  70. headers []*types.Header // Collection of headers to filter
  71. time time.Time // Arrival time of the headers
  72. }
  73. // headerFilterTask represents a batch of block bodies (transactions and uncles)
  74. // needing fetcher filtering.
  75. type bodyFilterTask struct {
  76. peer string // The source peer of block bodies
  77. transactions [][]*types.Transaction // Collection of transactions per block bodies
  78. uncles [][]*types.Header // Collection of uncles per block bodies
  79. time time.Time // Arrival time of the blocks' contents
  80. }
  81. // inject represents a schedules import operation.
  82. type inject struct {
  83. origin string
  84. block *types.Block
  85. }
  86. // Fetcher is responsible for accumulating block announcements from various peers
  87. // and scheduling them for retrieval.
  88. type Fetcher struct {
  89. // Various event channels
  90. notify chan *announce
  91. inject chan *inject
  92. blockFilter chan chan []*types.Block
  93. headerFilter chan chan *headerFilterTask
  94. bodyFilter chan chan *bodyFilterTask
  95. done chan common.Hash
  96. quit chan struct{}
  97. // Announce states
  98. announces map[string]int // Per peer announce counts to prevent memory exhaustion
  99. announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
  100. fetching map[common.Hash]*announce // Announced blocks, currently fetching
  101. fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
  102. completing map[common.Hash]*announce // Blocks with headers, currently body-completing
  103. // Block cache
  104. queue *prque.Prque // Queue containing the import operations (block number sorted)
  105. queues map[string]int // Per peer block counts to prevent memory exhaustion
  106. queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
  107. // Callbacks
  108. getBlock blockRetrievalFn // Retrieves a block from the local chain
  109. verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
  110. broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
  111. chainHeight chainHeightFn // Retrieves the current chain's height
  112. insertChain chainInsertFn // Injects a batch of blocks into the chain
  113. dropPeer peerDropFn // Drops a peer for misbehaving
  114. // Testing hooks
  115. announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
  116. queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
  117. fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
  118. completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
  119. importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
  120. }
  121. // New creates a block fetcher to retrieve blocks based on hash announcements.
  122. func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
  123. return &Fetcher{
  124. notify: make(chan *announce),
  125. inject: make(chan *inject),
  126. blockFilter: make(chan chan []*types.Block),
  127. headerFilter: make(chan chan *headerFilterTask),
  128. bodyFilter: make(chan chan *bodyFilterTask),
  129. done: make(chan common.Hash),
  130. quit: make(chan struct{}),
  131. announces: make(map[string]int),
  132. announced: make(map[common.Hash][]*announce),
  133. fetching: make(map[common.Hash]*announce),
  134. fetched: make(map[common.Hash][]*announce),
  135. completing: make(map[common.Hash]*announce),
  136. queue: prque.New(),
  137. queues: make(map[string]int),
  138. queued: make(map[common.Hash]*inject),
  139. getBlock: getBlock,
  140. verifyHeader: verifyHeader,
  141. broadcastBlock: broadcastBlock,
  142. chainHeight: chainHeight,
  143. insertChain: insertChain,
  144. dropPeer: dropPeer,
  145. }
  146. }
  147. // Start boots up the announcement based synchroniser, accepting and processing
  148. // hash notifications and block fetches until termination requested.
  149. func (f *Fetcher) Start() {
  150. go f.loop()
  151. }
  152. // Stop terminates the announcement based synchroniser, canceling all pending
  153. // operations.
  154. func (f *Fetcher) Stop() {
  155. close(f.quit)
  156. }
  157. // Notify announces the fetcher of the potential availability of a new block in
  158. // the network.
  159. func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
  160. headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
  161. block := &announce{
  162. hash: hash,
  163. number: number,
  164. time: time,
  165. origin: peer,
  166. fetchHeader: headerFetcher,
  167. fetchBodies: bodyFetcher,
  168. }
  169. select {
  170. case f.notify <- block:
  171. return nil
  172. case <-f.quit:
  173. return errTerminated
  174. }
  175. }
  176. // Enqueue tries to fill gaps the the fetcher's future import queue.
  177. func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
  178. op := &inject{
  179. origin: peer,
  180. block: block,
  181. }
  182. select {
  183. case f.inject <- op:
  184. return nil
  185. case <-f.quit:
  186. return errTerminated
  187. }
  188. }
  189. // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
  190. // returning those that should be handled differently.
  191. func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
  192. log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
  193. // Send the filter channel to the fetcher
  194. filter := make(chan *headerFilterTask)
  195. select {
  196. case f.headerFilter <- filter:
  197. case <-f.quit:
  198. return nil
  199. }
  200. // Request the filtering of the header list
  201. select {
  202. case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
  203. case <-f.quit:
  204. return nil
  205. }
  206. // Retrieve the headers remaining after filtering
  207. select {
  208. case task := <-filter:
  209. return task.headers
  210. case <-f.quit:
  211. return nil
  212. }
  213. }
  214. // FilterBodies extracts all the block bodies that were explicitly requested by
  215. // the fetcher, returning those that should be handled differently.
  216. func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
  217. log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
  218. // Send the filter channel to the fetcher
  219. filter := make(chan *bodyFilterTask)
  220. select {
  221. case f.bodyFilter <- filter:
  222. case <-f.quit:
  223. return nil, nil
  224. }
  225. // Request the filtering of the body list
  226. select {
  227. case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
  228. case <-f.quit:
  229. return nil, nil
  230. }
  231. // Retrieve the bodies remaining after filtering
  232. select {
  233. case task := <-filter:
  234. return task.transactions, task.uncles
  235. case <-f.quit:
  236. return nil, nil
  237. }
  238. }
  239. // Loop is the main fetcher loop, checking and processing various notification
  240. // events.
  241. func (f *Fetcher) loop() {
  242. // Iterate the block fetching until a quit is requested
  243. fetchTimer := time.NewTimer(0)
  244. completeTimer := time.NewTimer(0)
  245. for {
  246. // Clean up any expired block fetches
  247. for hash, announce := range f.fetching {
  248. if time.Since(announce.time) > fetchTimeout {
  249. f.forgetHash(hash)
  250. }
  251. }
  252. // Import any queued blocks that could potentially fit
  253. height := f.chainHeight()
  254. for !f.queue.Empty() {
  255. op := f.queue.PopItem().(*inject)
  256. hash := op.block.Hash()
  257. if f.queueChangeHook != nil {
  258. f.queueChangeHook(hash, false)
  259. }
  260. // If too high up the chain or phase, continue later
  261. number := op.block.NumberU64()
  262. if number > height+1 {
  263. f.queue.Push(op, -float32(number))
  264. if f.queueChangeHook != nil {
  265. f.queueChangeHook(hash, true)
  266. }
  267. break
  268. }
  269. // Otherwise if fresh and still unknown, try and import
  270. if number+maxUncleDist < height || f.getBlock(hash) != nil {
  271. f.forgetBlock(hash)
  272. continue
  273. }
  274. f.insert(op.origin, op.block)
  275. }
  276. // Wait for an outside event to occur
  277. select {
  278. case <-f.quit:
  279. // Fetcher terminating, abort all operations
  280. return
  281. case notification := <-f.notify:
  282. // A block was announced, make sure the peer isn't DOSing us
  283. propAnnounceInMeter.Mark(1)
  284. count := f.announces[notification.origin] + 1
  285. if count > hashLimit {
  286. log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
  287. propAnnounceDOSMeter.Mark(1)
  288. break
  289. }
  290. // If we have a valid block number, check that it's potentially useful
  291. if notification.number > 0 {
  292. if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  293. log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
  294. propAnnounceDropMeter.Mark(1)
  295. break
  296. }
  297. }
  298. // All is well, schedule the announce if block's not yet downloading
  299. if _, ok := f.fetching[notification.hash]; ok {
  300. break
  301. }
  302. if _, ok := f.completing[notification.hash]; ok {
  303. break
  304. }
  305. f.announces[notification.origin] = count
  306. f.announced[notification.hash] = append(f.announced[notification.hash], notification)
  307. if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
  308. f.announceChangeHook(notification.hash, true)
  309. }
  310. if len(f.announced) == 1 {
  311. f.rescheduleFetch(fetchTimer)
  312. }
  313. case op := <-f.inject:
  314. // A direct block insertion was requested, try and fill any pending gaps
  315. propBroadcastInMeter.Mark(1)
  316. f.enqueue(op.origin, op.block)
  317. case hash := <-f.done:
  318. // A pending import finished, remove all traces of the notification
  319. f.forgetHash(hash)
  320. f.forgetBlock(hash)
  321. case <-fetchTimer.C:
  322. // At least one block's timer ran out, check for needing retrieval
  323. request := make(map[string][]common.Hash)
  324. for hash, announces := range f.announced {
  325. if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
  326. // Pick a random peer to retrieve from, reset all others
  327. announce := announces[rand.Intn(len(announces))]
  328. f.forgetHash(hash)
  329. // If the block still didn't arrive, queue for fetching
  330. if f.getBlock(hash) == nil {
  331. request[announce.origin] = append(request[announce.origin], hash)
  332. f.fetching[hash] = announce
  333. }
  334. }
  335. }
  336. // Send out all block header requests
  337. for peer, hashes := range request {
  338. log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
  339. // Create a closure of the fetch and schedule in on a new thread
  340. fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
  341. go func() {
  342. if f.fetchingHook != nil {
  343. f.fetchingHook(hashes)
  344. }
  345. for _, hash := range hashes {
  346. headerFetchMeter.Mark(1)
  347. fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
  348. }
  349. }()
  350. }
  351. // Schedule the next fetch if blocks are still pending
  352. f.rescheduleFetch(fetchTimer)
  353. case <-completeTimer.C:
  354. // At least one header's timer ran out, retrieve everything
  355. request := make(map[string][]common.Hash)
  356. for hash, announces := range f.fetched {
  357. // Pick a random peer to retrieve from, reset all others
  358. announce := announces[rand.Intn(len(announces))]
  359. f.forgetHash(hash)
  360. // If the block still didn't arrive, queue for completion
  361. if f.getBlock(hash) == nil {
  362. request[announce.origin] = append(request[announce.origin], hash)
  363. f.completing[hash] = announce
  364. }
  365. }
  366. // Send out all block body requests
  367. for peer, hashes := range request {
  368. log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
  369. // Create a closure of the fetch and schedule in on a new thread
  370. if f.completingHook != nil {
  371. f.completingHook(hashes)
  372. }
  373. bodyFetchMeter.Mark(int64(len(hashes)))
  374. go f.completing[hashes[0]].fetchBodies(hashes)
  375. }
  376. // Schedule the next fetch if blocks are still pending
  377. f.rescheduleComplete(completeTimer)
  378. case filter := <-f.headerFilter:
  379. // Headers arrived from a remote peer. Extract those that were explicitly
  380. // requested by the fetcher, and return everything else so it's delivered
  381. // to other parts of the system.
  382. var task *headerFilterTask
  383. select {
  384. case task = <-filter:
  385. case <-f.quit:
  386. return
  387. }
  388. headerFilterInMeter.Mark(int64(len(task.headers)))
  389. // Split the batch of headers into unknown ones (to return to the caller),
  390. // known incomplete ones (requiring body retrievals) and completed blocks.
  391. unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
  392. for _, header := range task.headers {
  393. hash := header.Hash()
  394. // Filter fetcher-requested headers from other synchronisation algorithms
  395. if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
  396. // If the delivered header does not match the promised number, drop the announcer
  397. if header.Number.Uint64() != announce.number {
  398. log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
  399. f.dropPeer(announce.origin)
  400. f.forgetHash(hash)
  401. continue
  402. }
  403. // Only keep if not imported by other means
  404. if f.getBlock(hash) == nil {
  405. announce.header = header
  406. announce.time = task.time
  407. // If the block is empty (header only), short circuit into the final import queue
  408. if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
  409. log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  410. block := types.NewBlockWithHeader(header)
  411. block.ReceivedAt = task.time
  412. complete = append(complete, block)
  413. f.completing[hash] = announce
  414. continue
  415. }
  416. // Otherwise add to the list of blocks needing completion
  417. incomplete = append(incomplete, announce)
  418. } else {
  419. log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
  420. f.forgetHash(hash)
  421. }
  422. } else {
  423. // Fetcher doesn't know about it, add to the return list
  424. unknown = append(unknown, header)
  425. }
  426. }
  427. headerFilterOutMeter.Mark(int64(len(unknown)))
  428. select {
  429. case filter <- &headerFilterTask{headers: unknown, time: task.time}:
  430. case <-f.quit:
  431. return
  432. }
  433. // Schedule the retrieved headers for body completion
  434. for _, announce := range incomplete {
  435. hash := announce.header.Hash()
  436. if _, ok := f.completing[hash]; ok {
  437. continue
  438. }
  439. f.fetched[hash] = append(f.fetched[hash], announce)
  440. if len(f.fetched) == 1 {
  441. f.rescheduleComplete(completeTimer)
  442. }
  443. }
  444. // Schedule the header-only blocks for import
  445. for _, block := range complete {
  446. if announce := f.completing[block.Hash()]; announce != nil {
  447. f.enqueue(announce.origin, block)
  448. }
  449. }
  450. case filter := <-f.bodyFilter:
  451. // Block bodies arrived, extract any explicitly requested blocks, return the rest
  452. var task *bodyFilterTask
  453. select {
  454. case task = <-filter:
  455. case <-f.quit:
  456. return
  457. }
  458. bodyFilterInMeter.Mark(int64(len(task.transactions)))
  459. blocks := []*types.Block{}
  460. for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
  461. // Match up a body to any possible completion request
  462. matched := false
  463. for hash, announce := range f.completing {
  464. if f.queued[hash] == nil {
  465. txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
  466. uncleHash := types.CalcUncleHash(task.uncles[i])
  467. if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
  468. // Mark the body matched, reassemble if still unknown
  469. matched = true
  470. if f.getBlock(hash) == nil {
  471. block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
  472. block.ReceivedAt = task.time
  473. blocks = append(blocks, block)
  474. } else {
  475. f.forgetHash(hash)
  476. }
  477. }
  478. }
  479. }
  480. if matched {
  481. task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
  482. task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
  483. i--
  484. continue
  485. }
  486. }
  487. bodyFilterOutMeter.Mark(int64(len(task.transactions)))
  488. select {
  489. case filter <- task:
  490. case <-f.quit:
  491. return
  492. }
  493. // Schedule the retrieved blocks for ordered import
  494. for _, block := range blocks {
  495. if announce := f.completing[block.Hash()]; announce != nil {
  496. f.enqueue(announce.origin, block)
  497. }
  498. }
  499. }
  500. }
  501. }
  502. // rescheduleFetch resets the specified fetch timer to the next announce timeout.
  503. func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
  504. // Short circuit if no blocks are announced
  505. if len(f.announced) == 0 {
  506. return
  507. }
  508. // Otherwise find the earliest expiring announcement
  509. earliest := time.Now()
  510. for _, announces := range f.announced {
  511. if earliest.After(announces[0].time) {
  512. earliest = announces[0].time
  513. }
  514. }
  515. fetch.Reset(arriveTimeout - time.Since(earliest))
  516. }
  517. // rescheduleComplete resets the specified completion timer to the next fetch timeout.
  518. func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
  519. // Short circuit if no headers are fetched
  520. if len(f.fetched) == 0 {
  521. return
  522. }
  523. // Otherwise find the earliest expiring announcement
  524. earliest := time.Now()
  525. for _, announces := range f.fetched {
  526. if earliest.After(announces[0].time) {
  527. earliest = announces[0].time
  528. }
  529. }
  530. complete.Reset(gatherSlack - time.Since(earliest))
  531. }
  532. // enqueue schedules a new future import operation, if the block to be imported
  533. // has not yet been seen.
  534. func (f *Fetcher) enqueue(peer string, block *types.Block) {
  535. hash := block.Hash()
  536. // Ensure the peer isn't DOSing us
  537. count := f.queues[peer] + 1
  538. if count > blockLimit {
  539. log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
  540. propBroadcastDOSMeter.Mark(1)
  541. f.forgetHash(hash)
  542. return
  543. }
  544. // Discard any past or too distant blocks
  545. if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
  546. log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
  547. propBroadcastDropMeter.Mark(1)
  548. f.forgetHash(hash)
  549. return
  550. }
  551. // Schedule the block for future importing
  552. if _, ok := f.queued[hash]; !ok {
  553. op := &inject{
  554. origin: peer,
  555. block: block,
  556. }
  557. f.queues[peer] = count
  558. f.queued[hash] = op
  559. f.queue.Push(op, -float32(block.NumberU64()))
  560. if f.queueChangeHook != nil {
  561. f.queueChangeHook(op.block.Hash(), true)
  562. }
  563. log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
  564. }
  565. }
  566. // insert spawns a new goroutine to run a block insertion into the chain. If the
  567. // block's number is at the same height as the current import phase, it updates
  568. // the phase states accordingly.
  569. func (f *Fetcher) insert(peer string, block *types.Block) {
  570. hash := block.Hash()
  571. // Run the import on a new thread
  572. log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
  573. go func() {
  574. defer func() { f.done <- hash }()
  575. // If the parent's unknown, abort insertion
  576. parent := f.getBlock(block.ParentHash())
  577. if parent == nil {
  578. log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
  579. return
  580. }
  581. // Quickly validate the header and propagate the block if it passes
  582. switch err := f.verifyHeader(block.Header()); err {
  583. case nil:
  584. // All ok, quickly propagate to our peers
  585. propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
  586. go f.broadcastBlock(block, true)
  587. case consensus.ErrFutureBlock:
  588. // Weird future block, don't fail, but neither propagate
  589. default:
  590. // Something went very wrong, drop the peer
  591. log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  592. f.dropPeer(peer)
  593. return
  594. }
  595. // Run the actual import and log any issues
  596. if _, err := f.insertChain(types.Blocks{block}); err != nil {
  597. log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
  598. return
  599. }
  600. // If import succeeded, broadcast the block
  601. propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
  602. go f.broadcastBlock(block, false)
  603. // Invoke the testing hook if needed
  604. if f.importedHook != nil {
  605. f.importedHook(block)
  606. }
  607. }()
  608. }
  609. // forgetHash removes all traces of a block announcement from the fetcher's
  610. // internal state.
  611. func (f *Fetcher) forgetHash(hash common.Hash) {
  612. // Remove all pending announces and decrement DOS counters
  613. for _, announce := range f.announced[hash] {
  614. f.announces[announce.origin]--
  615. if f.announces[announce.origin] == 0 {
  616. delete(f.announces, announce.origin)
  617. }
  618. }
  619. delete(f.announced, hash)
  620. if f.announceChangeHook != nil {
  621. f.announceChangeHook(hash, false)
  622. }
  623. // Remove any pending fetches and decrement the DOS counters
  624. if announce := f.fetching[hash]; announce != nil {
  625. f.announces[announce.origin]--
  626. if f.announces[announce.origin] == 0 {
  627. delete(f.announces, announce.origin)
  628. }
  629. delete(f.fetching, hash)
  630. }
  631. // Remove any pending completion requests and decrement the DOS counters
  632. for _, announce := range f.fetched[hash] {
  633. f.announces[announce.origin]--
  634. if f.announces[announce.origin] == 0 {
  635. delete(f.announces, announce.origin)
  636. }
  637. }
  638. delete(f.fetched, hash)
  639. // Remove any pending completions and decrement the DOS counters
  640. if announce := f.completing[hash]; announce != nil {
  641. f.announces[announce.origin]--
  642. if f.announces[announce.origin] == 0 {
  643. delete(f.announces, announce.origin)
  644. }
  645. delete(f.completing, hash)
  646. }
  647. }
  648. // forgetBlock removes all traces of a queued block from the fetcher's internal
  649. // state.
  650. func (f *Fetcher) forgetBlock(hash common.Hash) {
  651. if insert := f.queued[hash]; insert != nil {
  652. f.queues[insert.origin]--
  653. if f.queues[insert.origin] == 0 {
  654. delete(f.queues, insert.origin)
  655. }
  656. delete(f.queued, hash)
  657. }
  658. }