123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737 |
- // Copyright 2015 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- // Package fetcher contains the block announcement based synchronisation.
- package fetcher
- import (
- "errors"
- "math/rand"
- "time"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/log"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
- )
- const (
- arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
- gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
- fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block
- maxUncleDist = 7 // Maximum allowed backward distance from the chain head
- maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
- hashLimit = 256 // Maximum number of unique blocks a peer may have announced
- blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
- )
- var (
- errTerminated = errors.New("terminated")
- )
- // blockRetrievalFn is a callback type for retrieving a block from the local chain.
- type blockRetrievalFn func(common.Hash) *types.Block
- // headerRequesterFn is a callback type for sending a header retrieval request.
- type headerRequesterFn func(common.Hash) error
- // bodyRequesterFn is a callback type for sending a body retrieval request.
- type bodyRequesterFn func([]common.Hash) error
- // headerVerifierFn is a callback type to verify a block's header for fast propagation.
- type headerVerifierFn func(header *types.Header) error
- // blockBroadcasterFn is a callback type for broadcasting a block to connected peers.
- type blockBroadcasterFn func(block *types.Block, propagate bool)
- // chainHeightFn is a callback type to retrieve the current chain height.
- type chainHeightFn func() uint64
- // chainInsertFn is a callback type to insert a batch of blocks into the local chain.
- type chainInsertFn func(types.Blocks) (int, error)
- // peerDropFn is a callback type for dropping a peer detected as malicious.
- type peerDropFn func(id string)
- // announce is the hash notification of the availability of a new block in the
- // network.
- type announce struct {
- hash common.Hash // Hash of the block being announced
- number uint64 // Number of the block being announced (0 = unknown | old protocol)
- header *types.Header // Header of the block partially reassembled (new protocol)
- time time.Time // Timestamp of the announcement
- origin string // Identifier of the peer originating the notification
- fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
- fetchBodies bodyRequesterFn // Fetcher function to retrieve the body of an announced block
- }
- // headerFilterTask represents a batch of headers needing fetcher filtering.
- type headerFilterTask struct {
- peer string // The source peer of block headers
- headers []*types.Header // Collection of headers to filter
- time time.Time // Arrival time of the headers
- }
- // headerFilterTask represents a batch of block bodies (transactions and uncles)
- // needing fetcher filtering.
- type bodyFilterTask struct {
- peer string // The source peer of block bodies
- transactions [][]*types.Transaction // Collection of transactions per block bodies
- uncles [][]*types.Header // Collection of uncles per block bodies
- time time.Time // Arrival time of the blocks' contents
- }
- // inject represents a schedules import operation.
- type inject struct {
- origin string
- block *types.Block
- }
- // Fetcher is responsible for accumulating block announcements from various peers
- // and scheduling them for retrieval.
- type Fetcher struct {
- // Various event channels
- notify chan *announce
- inject chan *inject
- blockFilter chan chan []*types.Block
- headerFilter chan chan *headerFilterTask
- bodyFilter chan chan *bodyFilterTask
- done chan common.Hash
- quit chan struct{}
- // Announce states
- announces map[string]int // Per peer announce counts to prevent memory exhaustion
- announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
- fetching map[common.Hash]*announce // Announced blocks, currently fetching
- fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
- completing map[common.Hash]*announce // Blocks with headers, currently body-completing
- // Block cache
- queue *prque.Prque // Queue containing the import operations (block number sorted)
- queues map[string]int // Per peer block counts to prevent memory exhaustion
- queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
- // Callbacks
- getBlock blockRetrievalFn // Retrieves a block from the local chain
- verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
- broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
- chainHeight chainHeightFn // Retrieves the current chain's height
- insertChain chainInsertFn // Injects a batch of blocks into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
- // Testing hooks
- announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
- queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
- fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
- completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
- importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
- }
- // New creates a block fetcher to retrieve blocks based on hash announcements.
- func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
- return &Fetcher{
- notify: make(chan *announce),
- inject: make(chan *inject),
- blockFilter: make(chan chan []*types.Block),
- headerFilter: make(chan chan *headerFilterTask),
- bodyFilter: make(chan chan *bodyFilterTask),
- done: make(chan common.Hash),
- quit: make(chan struct{}),
- announces: make(map[string]int),
- announced: make(map[common.Hash][]*announce),
- fetching: make(map[common.Hash]*announce),
- fetched: make(map[common.Hash][]*announce),
- completing: make(map[common.Hash]*announce),
- queue: prque.New(),
- queues: make(map[string]int),
- queued: make(map[common.Hash]*inject),
- getBlock: getBlock,
- verifyHeader: verifyHeader,
- broadcastBlock: broadcastBlock,
- chainHeight: chainHeight,
- insertChain: insertChain,
- dropPeer: dropPeer,
- }
- }
- // Start boots up the announcement based synchroniser, accepting and processing
- // hash notifications and block fetches until termination requested.
- func (f *Fetcher) Start() {
- go f.loop()
- }
- // Stop terminates the announcement based synchroniser, canceling all pending
- // operations.
- func (f *Fetcher) Stop() {
- close(f.quit)
- }
- // Notify announces the fetcher of the potential availability of a new block in
- // the network.
- func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
- headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
- block := &announce{
- hash: hash,
- number: number,
- time: time,
- origin: peer,
- fetchHeader: headerFetcher,
- fetchBodies: bodyFetcher,
- }
- select {
- case f.notify <- block:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // Enqueue tries to fill gaps the the fetcher's future import queue.
- func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
- op := &inject{
- origin: peer,
- block: block,
- }
- select {
- case f.inject <- op:
- return nil
- case <-f.quit:
- return errTerminated
- }
- }
- // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
- // returning those that should be handled differently.
- func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
- log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
- // Send the filter channel to the fetcher
- filter := make(chan *headerFilterTask)
- select {
- case f.headerFilter <- filter:
- case <-f.quit:
- return nil
- }
- // Request the filtering of the header list
- select {
- case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
- case <-f.quit:
- return nil
- }
- // Retrieve the headers remaining after filtering
- select {
- case task := <-filter:
- return task.headers
- case <-f.quit:
- return nil
- }
- }
- // FilterBodies extracts all the block bodies that were explicitly requested by
- // the fetcher, returning those that should be handled differently.
- func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
- log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
- // Send the filter channel to the fetcher
- filter := make(chan *bodyFilterTask)
- select {
- case f.bodyFilter <- filter:
- case <-f.quit:
- return nil, nil
- }
- // Request the filtering of the body list
- select {
- case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
- case <-f.quit:
- return nil, nil
- }
- // Retrieve the bodies remaining after filtering
- select {
- case task := <-filter:
- return task.transactions, task.uncles
- case <-f.quit:
- return nil, nil
- }
- }
- // Loop is the main fetcher loop, checking and processing various notification
- // events.
- func (f *Fetcher) loop() {
- // Iterate the block fetching until a quit is requested
- fetchTimer := time.NewTimer(0)
- completeTimer := time.NewTimer(0)
- for {
- // Clean up any expired block fetches
- for hash, announce := range f.fetching {
- if time.Since(announce.time) > fetchTimeout {
- f.forgetHash(hash)
- }
- }
- // Import any queued blocks that could potentially fit
- height := f.chainHeight()
- for !f.queue.Empty() {
- op := f.queue.PopItem().(*inject)
- hash := op.block.Hash()
- if f.queueChangeHook != nil {
- f.queueChangeHook(hash, false)
- }
- // If too high up the chain or phase, continue later
- number := op.block.NumberU64()
- if number > height+1 {
- f.queue.Push(op, -float32(number))
- if f.queueChangeHook != nil {
- f.queueChangeHook(hash, true)
- }
- break
- }
- // Otherwise if fresh and still unknown, try and import
- if number+maxUncleDist < height || f.getBlock(hash) != nil {
- f.forgetBlock(hash)
- continue
- }
- f.insert(op.origin, op.block)
- }
- // Wait for an outside event to occur
- select {
- case <-f.quit:
- // Fetcher terminating, abort all operations
- return
- case notification := <-f.notify:
- // A block was announced, make sure the peer isn't DOSing us
- propAnnounceInMeter.Mark(1)
- count := f.announces[notification.origin] + 1
- if count > hashLimit {
- log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
- propAnnounceDOSMeter.Mark(1)
- break
- }
- // If we have a valid block number, check that it's potentially useful
- if notification.number > 0 {
- if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
- log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
- propAnnounceDropMeter.Mark(1)
- break
- }
- }
- // All is well, schedule the announce if block's not yet downloading
- if _, ok := f.fetching[notification.hash]; ok {
- break
- }
- if _, ok := f.completing[notification.hash]; ok {
- break
- }
- f.announces[notification.origin] = count
- f.announced[notification.hash] = append(f.announced[notification.hash], notification)
- if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
- f.announceChangeHook(notification.hash, true)
- }
- if len(f.announced) == 1 {
- f.rescheduleFetch(fetchTimer)
- }
- case op := <-f.inject:
- // A direct block insertion was requested, try and fill any pending gaps
- propBroadcastInMeter.Mark(1)
- f.enqueue(op.origin, op.block)
- case hash := <-f.done:
- // A pending import finished, remove all traces of the notification
- f.forgetHash(hash)
- f.forgetBlock(hash)
- case <-fetchTimer.C:
- // At least one block's timer ran out, check for needing retrieval
- request := make(map[string][]common.Hash)
- for hash, announces := range f.announced {
- if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
- // Pick a random peer to retrieve from, reset all others
- announce := announces[rand.Intn(len(announces))]
- f.forgetHash(hash)
- // If the block still didn't arrive, queue for fetching
- if f.getBlock(hash) == nil {
- request[announce.origin] = append(request[announce.origin], hash)
- f.fetching[hash] = announce
- }
- }
- }
- // Send out all block header requests
- for peer, hashes := range request {
- log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
- // Create a closure of the fetch and schedule in on a new thread
- fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
- go func() {
- if f.fetchingHook != nil {
- f.fetchingHook(hashes)
- }
- for _, hash := range hashes {
- headerFetchMeter.Mark(1)
- fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
- }
- }()
- }
- // Schedule the next fetch if blocks are still pending
- f.rescheduleFetch(fetchTimer)
- case <-completeTimer.C:
- // At least one header's timer ran out, retrieve everything
- request := make(map[string][]common.Hash)
- for hash, announces := range f.fetched {
- // Pick a random peer to retrieve from, reset all others
- announce := announces[rand.Intn(len(announces))]
- f.forgetHash(hash)
- // If the block still didn't arrive, queue for completion
- if f.getBlock(hash) == nil {
- request[announce.origin] = append(request[announce.origin], hash)
- f.completing[hash] = announce
- }
- }
- // Send out all block body requests
- for peer, hashes := range request {
- log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
- // Create a closure of the fetch and schedule in on a new thread
- if f.completingHook != nil {
- f.completingHook(hashes)
- }
- bodyFetchMeter.Mark(int64(len(hashes)))
- go f.completing[hashes[0]].fetchBodies(hashes)
- }
- // Schedule the next fetch if blocks are still pending
- f.rescheduleComplete(completeTimer)
- case filter := <-f.headerFilter:
- // Headers arrived from a remote peer. Extract those that were explicitly
- // requested by the fetcher, and return everything else so it's delivered
- // to other parts of the system.
- var task *headerFilterTask
- select {
- case task = <-filter:
- case <-f.quit:
- return
- }
- headerFilterInMeter.Mark(int64(len(task.headers)))
- // Split the batch of headers into unknown ones (to return to the caller),
- // known incomplete ones (requiring body retrievals) and completed blocks.
- unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
- for _, header := range task.headers {
- hash := header.Hash()
- // Filter fetcher-requested headers from other synchronisation algorithms
- if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
- // If the delivered header does not match the promised number, drop the announcer
- if header.Number.Uint64() != announce.number {
- log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
- f.dropPeer(announce.origin)
- f.forgetHash(hash)
- continue
- }
- // Only keep if not imported by other means
- if f.getBlock(hash) == nil {
- announce.header = header
- announce.time = task.time
- // If the block is empty (header only), short circuit into the final import queue
- if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
- log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
- block := types.NewBlockWithHeader(header)
- block.ReceivedAt = task.time
- complete = append(complete, block)
- f.completing[hash] = announce
- continue
- }
- // Otherwise add to the list of blocks needing completion
- incomplete = append(incomplete, announce)
- } else {
- log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
- f.forgetHash(hash)
- }
- } else {
- // Fetcher doesn't know about it, add to the return list
- unknown = append(unknown, header)
- }
- }
- headerFilterOutMeter.Mark(int64(len(unknown)))
- select {
- case filter <- &headerFilterTask{headers: unknown, time: task.time}:
- case <-f.quit:
- return
- }
- // Schedule the retrieved headers for body completion
- for _, announce := range incomplete {
- hash := announce.header.Hash()
- if _, ok := f.completing[hash]; ok {
- continue
- }
- f.fetched[hash] = append(f.fetched[hash], announce)
- if len(f.fetched) == 1 {
- f.rescheduleComplete(completeTimer)
- }
- }
- // Schedule the header-only blocks for import
- for _, block := range complete {
- if announce := f.completing[block.Hash()]; announce != nil {
- f.enqueue(announce.origin, block)
- }
- }
- case filter := <-f.bodyFilter:
- // Block bodies arrived, extract any explicitly requested blocks, return the rest
- var task *bodyFilterTask
- select {
- case task = <-filter:
- case <-f.quit:
- return
- }
- bodyFilterInMeter.Mark(int64(len(task.transactions)))
- blocks := []*types.Block{}
- for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
- // Match up a body to any possible completion request
- matched := false
- for hash, announce := range f.completing {
- if f.queued[hash] == nil {
- txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
- uncleHash := types.CalcUncleHash(task.uncles[i])
- if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
- // Mark the body matched, reassemble if still unknown
- matched = true
- if f.getBlock(hash) == nil {
- block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
- block.ReceivedAt = task.time
- blocks = append(blocks, block)
- } else {
- f.forgetHash(hash)
- }
- }
- }
- }
- if matched {
- task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
- task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
- i--
- continue
- }
- }
- bodyFilterOutMeter.Mark(int64(len(task.transactions)))
- select {
- case filter <- task:
- case <-f.quit:
- return
- }
- // Schedule the retrieved blocks for ordered import
- for _, block := range blocks {
- if announce := f.completing[block.Hash()]; announce != nil {
- f.enqueue(announce.origin, block)
- }
- }
- }
- }
- }
- // rescheduleFetch resets the specified fetch timer to the next announce timeout.
- func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
- // Short circuit if no blocks are announced
- if len(f.announced) == 0 {
- return
- }
- // Otherwise find the earliest expiring announcement
- earliest := time.Now()
- for _, announces := range f.announced {
- if earliest.After(announces[0].time) {
- earliest = announces[0].time
- }
- }
- fetch.Reset(arriveTimeout - time.Since(earliest))
- }
- // rescheduleComplete resets the specified completion timer to the next fetch timeout.
- func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
- // Short circuit if no headers are fetched
- if len(f.fetched) == 0 {
- return
- }
- // Otherwise find the earliest expiring announcement
- earliest := time.Now()
- for _, announces := range f.fetched {
- if earliest.After(announces[0].time) {
- earliest = announces[0].time
- }
- }
- complete.Reset(gatherSlack - time.Since(earliest))
- }
- // enqueue schedules a new future import operation, if the block to be imported
- // has not yet been seen.
- func (f *Fetcher) enqueue(peer string, block *types.Block) {
- hash := block.Hash()
- // Ensure the peer isn't DOSing us
- count := f.queues[peer] + 1
- if count > blockLimit {
- log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
- propBroadcastDOSMeter.Mark(1)
- f.forgetHash(hash)
- return
- }
- // Discard any past or too distant blocks
- if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
- log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
- propBroadcastDropMeter.Mark(1)
- f.forgetHash(hash)
- return
- }
- // Schedule the block for future importing
- if _, ok := f.queued[hash]; !ok {
- op := &inject{
- origin: peer,
- block: block,
- }
- f.queues[peer] = count
- f.queued[hash] = op
- f.queue.Push(op, -float32(block.NumberU64()))
- if f.queueChangeHook != nil {
- f.queueChangeHook(op.block.Hash(), true)
- }
- log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
- }
- }
- // insert spawns a new goroutine to run a block insertion into the chain. If the
- // block's number is at the same height as the current import phase, it updates
- // the phase states accordingly.
- func (f *Fetcher) insert(peer string, block *types.Block) {
- hash := block.Hash()
- // Run the import on a new thread
- log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
- go func() {
- defer func() { f.done <- hash }()
- // If the parent's unknown, abort insertion
- parent := f.getBlock(block.ParentHash())
- if parent == nil {
- log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
- return
- }
- // Quickly validate the header and propagate the block if it passes
- switch err := f.verifyHeader(block.Header()); err {
- case nil:
- // All ok, quickly propagate to our peers
- propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
- go f.broadcastBlock(block, true)
- case consensus.ErrFutureBlock:
- // Weird future block, don't fail, but neither propagate
- default:
- // Something went very wrong, drop the peer
- log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
- f.dropPeer(peer)
- return
- }
- // Run the actual import and log any issues
- if _, err := f.insertChain(types.Blocks{block}); err != nil {
- log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
- return
- }
- // If import succeeded, broadcast the block
- propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
- go f.broadcastBlock(block, false)
- // Invoke the testing hook if needed
- if f.importedHook != nil {
- f.importedHook(block)
- }
- }()
- }
- // forgetHash removes all traces of a block announcement from the fetcher's
- // internal state.
- func (f *Fetcher) forgetHash(hash common.Hash) {
- // Remove all pending announces and decrement DOS counters
- for _, announce := range f.announced[hash] {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- }
- delete(f.announced, hash)
- if f.announceChangeHook != nil {
- f.announceChangeHook(hash, false)
- }
- // Remove any pending fetches and decrement the DOS counters
- if announce := f.fetching[hash]; announce != nil {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- delete(f.fetching, hash)
- }
- // Remove any pending completion requests and decrement the DOS counters
- for _, announce := range f.fetched[hash] {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- }
- delete(f.fetched, hash)
- // Remove any pending completions and decrement the DOS counters
- if announce := f.completing[hash]; announce != nil {
- f.announces[announce.origin]--
- if f.announces[announce.origin] == 0 {
- delete(f.announces, announce.origin)
- }
- delete(f.completing, hash)
- }
- }
- // forgetBlock removes all traces of a queued block from the fetcher's internal
- // state.
- func (f *Fetcher) forgetBlock(hash common.Hash) {
- if insert := f.queued[hash]; insert != nil {
- f.queues[insert.origin]--
- if f.queues[insert.origin] == 0 {
- delete(f.queues, insert.origin)
- }
- delete(f.queued, hash)
- }
- }
|