fetcher.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package les implements the Light Ethereum Subprotocol.
  17. package les
  18. import (
  19. "math/big"
  20. "sync"
  21. "time"
  22. "github.com/ethereum/go-ethereum/common"
  23. "github.com/ethereum/go-ethereum/common/mclock"
  24. "github.com/ethereum/go-ethereum/consensus"
  25. "github.com/ethereum/go-ethereum/core/rawdb"
  26. "github.com/ethereum/go-ethereum/core/types"
  27. "github.com/ethereum/go-ethereum/light"
  28. "github.com/ethereum/go-ethereum/log"
  29. )
  30. const (
  31. blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others
  32. maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer
  33. )
  34. // lightFetcher implements retrieval of newly announced headers. It also provides a peerHasBlock function for the
  35. // ODR system to ensure that we only request data related to a certain block from peers who have already processed
  36. // and announced that block.
  37. type lightFetcher struct {
  38. pm *ProtocolManager
  39. odr *LesOdr
  40. chain *light.LightChain
  41. lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests
  42. maxConfirmedTd *big.Int
  43. peers map[*peer]*fetcherPeerInfo
  44. lastUpdateStats *updateStatsEntry
  45. syncing bool
  46. syncDone chan *peer
  47. reqMu sync.RWMutex // reqMu protects access to sent header fetch requests
  48. requested map[uint64]fetchRequest
  49. deliverChn chan fetchResponse
  50. timeoutChn chan uint64
  51. requestChn chan bool // true if initiated from outside
  52. }
  53. // fetcherPeerInfo holds fetcher-specific information about each active peer
  54. type fetcherPeerInfo struct {
  55. root, lastAnnounced *fetcherTreeNode
  56. nodeCnt int
  57. confirmedTd *big.Int
  58. bestConfirmed *fetcherTreeNode
  59. nodeByHash map[common.Hash]*fetcherTreeNode
  60. firstUpdateStats *updateStatsEntry
  61. }
  62. // fetcherTreeNode is a node of a tree that holds information about blocks recently
  63. // announced and confirmed by a certain peer. Each new announce message from a peer
  64. // adds nodes to the tree, based on the previous announced head and the reorg depth.
  65. // There are three possible states for a tree node:
  66. // - announced: not downloaded (known) yet, but we know its head, number and td
  67. // - intermediate: not known, hash and td are empty, they are filled out when it becomes known
  68. // - known: both announced by this peer and downloaded (from any peer).
  69. // This structure makes it possible to always know which peer has a certain block,
  70. // which is necessary for selecting a suitable peer for ODR requests and also for
  71. // canonizing new heads. It also helps to always download the minimum necessary
  72. // amount of headers with a single request.
  73. type fetcherTreeNode struct {
  74. hash common.Hash
  75. number uint64
  76. td *big.Int
  77. known, requested bool
  78. parent *fetcherTreeNode
  79. children []*fetcherTreeNode
  80. }
  81. // fetchRequest represents a header download request
  82. type fetchRequest struct {
  83. hash common.Hash
  84. amount uint64
  85. peer *peer
  86. sent mclock.AbsTime
  87. timeout bool
  88. }
  89. // fetchResponse represents a header download response
  90. type fetchResponse struct {
  91. reqID uint64
  92. headers []*types.Header
  93. peer *peer
  94. }
  95. // newLightFetcher creates a new light fetcher
  96. func newLightFetcher(pm *ProtocolManager) *lightFetcher {
  97. f := &lightFetcher{
  98. pm: pm,
  99. chain: pm.blockchain.(*light.LightChain),
  100. odr: pm.odr,
  101. peers: make(map[*peer]*fetcherPeerInfo),
  102. deliverChn: make(chan fetchResponse, 100),
  103. requested: make(map[uint64]fetchRequest),
  104. timeoutChn: make(chan uint64),
  105. requestChn: make(chan bool, 100),
  106. syncDone: make(chan *peer),
  107. maxConfirmedTd: big.NewInt(0),
  108. }
  109. pm.peers.notify(f)
  110. f.pm.wg.Add(1)
  111. go f.syncLoop()
  112. return f
  113. }
  114. // syncLoop is the main event loop of the light fetcher
  115. func (f *lightFetcher) syncLoop() {
  116. requesting := false
  117. defer f.pm.wg.Done()
  118. for {
  119. select {
  120. case <-f.pm.quitSync:
  121. return
  122. // when a new announce is received, request loop keeps running until
  123. // no further requests are necessary or possible
  124. case newAnnounce := <-f.requestChn:
  125. f.lock.Lock()
  126. s := requesting
  127. requesting = false
  128. var (
  129. rq *distReq
  130. reqID uint64
  131. )
  132. if !f.syncing && !(newAnnounce && s) {
  133. rq, reqID = f.nextRequest()
  134. }
  135. syncing := f.syncing
  136. f.lock.Unlock()
  137. if rq != nil {
  138. requesting = true
  139. _, ok := <-f.pm.reqDist.queue(rq)
  140. if !ok {
  141. f.requestChn <- false
  142. }
  143. if !syncing {
  144. go func() {
  145. time.Sleep(softRequestTimeout)
  146. f.reqMu.Lock()
  147. req, ok := f.requested[reqID]
  148. if ok {
  149. req.timeout = true
  150. f.requested[reqID] = req
  151. }
  152. f.reqMu.Unlock()
  153. // keep starting new requests while possible
  154. f.requestChn <- false
  155. }()
  156. }
  157. }
  158. case reqID := <-f.timeoutChn:
  159. f.reqMu.Lock()
  160. req, ok := f.requested[reqID]
  161. if ok {
  162. delete(f.requested, reqID)
  163. }
  164. f.reqMu.Unlock()
  165. if ok {
  166. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
  167. req.peer.Log().Debug("Fetching data timed out hard")
  168. go f.pm.removePeer(req.peer.id)
  169. }
  170. case resp := <-f.deliverChn:
  171. f.reqMu.Lock()
  172. req, ok := f.requested[resp.reqID]
  173. if ok && req.peer != resp.peer {
  174. ok = false
  175. }
  176. if ok {
  177. delete(f.requested, resp.reqID)
  178. }
  179. f.reqMu.Unlock()
  180. if ok {
  181. f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
  182. }
  183. f.lock.Lock()
  184. if !ok || !(f.syncing || f.processResponse(req, resp)) {
  185. resp.peer.Log().Debug("Failed processing response")
  186. go f.pm.removePeer(resp.peer.id)
  187. }
  188. f.lock.Unlock()
  189. case p := <-f.syncDone:
  190. f.lock.Lock()
  191. p.Log().Debug("Done synchronising with peer")
  192. f.checkSyncedHeaders(p)
  193. f.syncing = false
  194. f.lock.Unlock()
  195. }
  196. }
  197. }
  198. // registerPeer adds a new peer to the fetcher's peer set
  199. func (f *lightFetcher) registerPeer(p *peer) {
  200. p.lock.Lock()
  201. p.hasBlock = func(hash common.Hash, number uint64) bool {
  202. return f.peerHasBlock(p, hash, number)
  203. }
  204. p.lock.Unlock()
  205. f.lock.Lock()
  206. defer f.lock.Unlock()
  207. f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)}
  208. }
  209. // unregisterPeer removes a new peer from the fetcher's peer set
  210. func (f *lightFetcher) unregisterPeer(p *peer) {
  211. p.lock.Lock()
  212. p.hasBlock = nil
  213. p.lock.Unlock()
  214. f.lock.Lock()
  215. defer f.lock.Unlock()
  216. // check for potential timed out block delay statistics
  217. f.checkUpdateStats(p, nil)
  218. delete(f.peers, p)
  219. }
  220. // announce processes a new announcement message received from a peer, adding new
  221. // nodes to the peer's block tree and removing old nodes if necessary
  222. func (f *lightFetcher) announce(p *peer, head *announceData) {
  223. f.lock.Lock()
  224. defer f.lock.Unlock()
  225. p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth)
  226. fp := f.peers[p]
  227. if fp == nil {
  228. p.Log().Debug("Announcement from unknown peer")
  229. return
  230. }
  231. if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
  232. // announced tds should be strictly monotonic
  233. p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td)
  234. go f.pm.removePeer(p.id)
  235. return
  236. }
  237. n := fp.lastAnnounced
  238. for i := uint64(0); i < head.ReorgDepth; i++ {
  239. if n == nil {
  240. break
  241. }
  242. n = n.parent
  243. }
  244. if n != nil {
  245. // n is now the reorg common ancestor, add a new branch of nodes
  246. // check if the node count is too high to add new nodes
  247. locked := false
  248. for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil {
  249. if !locked {
  250. f.chain.LockChain()
  251. defer f.chain.UnlockChain()
  252. locked = true
  253. }
  254. // if one of root's children is canonical, keep it, delete other branches and root itself
  255. var newRoot *fetcherTreeNode
  256. for i, nn := range fp.root.children {
  257. if rawdb.ReadCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
  258. fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
  259. nn.parent = nil
  260. newRoot = nn
  261. break
  262. }
  263. }
  264. fp.deleteNode(fp.root)
  265. if n == fp.root {
  266. n = newRoot
  267. }
  268. fp.root = newRoot
  269. if newRoot == nil || !f.checkKnownNode(p, newRoot) {
  270. fp.bestConfirmed = nil
  271. fp.confirmedTd = nil
  272. }
  273. if n == nil {
  274. break
  275. }
  276. }
  277. if n != nil {
  278. for n.number < head.Number {
  279. nn := &fetcherTreeNode{number: n.number + 1, parent: n}
  280. n.children = append(n.children, nn)
  281. n = nn
  282. fp.nodeCnt++
  283. }
  284. n.hash = head.Hash
  285. n.td = head.Td
  286. fp.nodeByHash[n.hash] = n
  287. }
  288. }
  289. if n == nil {
  290. // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed
  291. if fp.root != nil {
  292. fp.deleteNode(fp.root)
  293. }
  294. n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td}
  295. fp.root = n
  296. fp.nodeCnt++
  297. fp.nodeByHash[n.hash] = n
  298. fp.bestConfirmed = nil
  299. fp.confirmedTd = nil
  300. }
  301. f.checkKnownNode(p, n)
  302. p.lock.Lock()
  303. p.headInfo = head
  304. fp.lastAnnounced = n
  305. p.lock.Unlock()
  306. f.checkUpdateStats(p, nil)
  307. f.requestChn <- true
  308. }
  309. // peerHasBlock returns true if we can assume the peer knows the given block
  310. // based on its announcements
  311. func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bool {
  312. f.lock.Lock()
  313. defer f.lock.Unlock()
  314. if f.syncing {
  315. // always return true when syncing
  316. // false positives are acceptable, a more sophisticated condition can be implemented later
  317. return true
  318. }
  319. fp := f.peers[p]
  320. if fp == nil || fp.root == nil {
  321. return false
  322. }
  323. if number >= fp.root.number {
  324. // it is recent enough that if it is known, is should be in the peer's block tree
  325. return fp.nodeByHash[hash] != nil
  326. }
  327. f.chain.LockChain()
  328. defer f.chain.UnlockChain()
  329. // if it's older than the peer's block tree root but it's in the same canonical chain
  330. // as the root, we can still be sure the peer knows it
  331. //
  332. // when syncing, just check if it is part of the known chain, there is nothing better we
  333. // can do since we do not know the most recent block hash yet
  334. return rawdb.ReadCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.pm.chainDb, number) == hash
  335. }
  336. // requestAmount calculates the amount of headers to be downloaded starting
  337. // from a certain head backwards
  338. func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
  339. amount := uint64(0)
  340. nn := n
  341. for nn != nil && !f.checkKnownNode(p, nn) {
  342. nn = nn.parent
  343. amount++
  344. }
  345. if nn == nil {
  346. amount = n.number
  347. }
  348. return amount
  349. }
  350. // requestedID tells if a certain reqID has been requested by the fetcher
  351. func (f *lightFetcher) requestedID(reqID uint64) bool {
  352. f.reqMu.RLock()
  353. _, ok := f.requested[reqID]
  354. f.reqMu.RUnlock()
  355. return ok
  356. }
  357. // nextRequest selects the peer and announced head to be requested next, amount
  358. // to be downloaded starting from the head backwards is also returned
  359. func (f *lightFetcher) nextRequest() (*distReq, uint64) {
  360. var (
  361. bestHash common.Hash
  362. bestAmount uint64
  363. )
  364. bestTd := f.maxConfirmedTd
  365. bestSyncing := false
  366. for p, fp := range f.peers {
  367. for hash, n := range fp.nodeByHash {
  368. if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) {
  369. amount := f.requestAmount(p, n)
  370. if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount {
  371. bestHash = hash
  372. bestAmount = amount
  373. bestTd = n.td
  374. bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
  375. }
  376. }
  377. }
  378. }
  379. if bestTd == f.maxConfirmedTd {
  380. return nil, 0
  381. }
  382. f.syncing = bestSyncing
  383. var rq *distReq
  384. reqID := genReqID()
  385. if f.syncing {
  386. rq = &distReq{
  387. getCost: func(dp distPeer) uint64 {
  388. return 0
  389. },
  390. canSend: func(dp distPeer) bool {
  391. p := dp.(*peer)
  392. f.lock.Lock()
  393. defer f.lock.Unlock()
  394. fp := f.peers[p]
  395. return fp != nil && fp.nodeByHash[bestHash] != nil
  396. },
  397. request: func(dp distPeer) func() {
  398. go func() {
  399. p := dp.(*peer)
  400. p.Log().Debug("Synchronisation started")
  401. f.pm.synchronise(p)
  402. f.syncDone <- p
  403. }()
  404. return nil
  405. },
  406. }
  407. } else {
  408. rq = &distReq{
  409. getCost: func(dp distPeer) uint64 {
  410. p := dp.(*peer)
  411. return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
  412. },
  413. canSend: func(dp distPeer) bool {
  414. p := dp.(*peer)
  415. f.lock.Lock()
  416. defer f.lock.Unlock()
  417. fp := f.peers[p]
  418. if fp == nil {
  419. return false
  420. }
  421. n := fp.nodeByHash[bestHash]
  422. return n != nil && !n.requested
  423. },
  424. request: func(dp distPeer) func() {
  425. p := dp.(*peer)
  426. f.lock.Lock()
  427. fp := f.peers[p]
  428. if fp != nil {
  429. n := fp.nodeByHash[bestHash]
  430. if n != nil {
  431. n.requested = true
  432. }
  433. }
  434. f.lock.Unlock()
  435. cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
  436. p.fcServer.QueueRequest(reqID, cost)
  437. f.reqMu.Lock()
  438. f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
  439. f.reqMu.Unlock()
  440. go func() {
  441. time.Sleep(hardRequestTimeout)
  442. f.timeoutChn <- reqID
  443. }()
  444. return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
  445. },
  446. }
  447. }
  448. return rq, reqID
  449. }
  450. // deliverHeaders delivers header download request responses for processing
  451. func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) {
  452. f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
  453. }
  454. // processResponse processes header download request responses, returns true if successful
  455. func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
  456. if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
  457. req.peer.Log().Debug("Response content mismatch", "requested", len(resp.headers), "reqfrom", resp.headers[0], "delivered", req.amount, "delfrom", req.hash)
  458. return false
  459. }
  460. headers := make([]*types.Header, req.amount)
  461. for i, header := range resp.headers {
  462. headers[int(req.amount)-1-i] = header
  463. }
  464. if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
  465. if err == consensus.ErrFutureBlock {
  466. return true
  467. }
  468. log.Debug("Failed to insert header chain", "err", err)
  469. return false
  470. }
  471. tds := make([]*big.Int, len(headers))
  472. for i, header := range headers {
  473. td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
  474. if td == nil {
  475. log.Debug("Total difficulty not found for header", "index", i+1, "number", header.Number, "hash", header.Hash())
  476. return false
  477. }
  478. tds[i] = td
  479. }
  480. f.newHeaders(headers, tds)
  481. return true
  482. }
  483. // newHeaders updates the block trees of all active peers according to a newly
  484. // downloaded and validated batch or headers
  485. func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
  486. var maxTd *big.Int
  487. for p, fp := range f.peers {
  488. if !f.checkAnnouncedHeaders(fp, headers, tds) {
  489. p.Log().Debug("Inconsistent announcement")
  490. go f.pm.removePeer(p.id)
  491. }
  492. if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
  493. maxTd = fp.confirmedTd
  494. }
  495. }
  496. if maxTd != nil {
  497. f.updateMaxConfirmedTd(maxTd)
  498. }
  499. }
  500. // checkAnnouncedHeaders updates peer's block tree if necessary after validating
  501. // a batch of headers. It searches for the latest header in the batch that has a
  502. // matching tree node (if any), and if it has not been marked as known already,
  503. // sets it and its parents to known (even those which are older than the currently
  504. // validated ones). Return value shows if all hashes, numbers and Tds matched
  505. // correctly to the announced values (otherwise the peer should be dropped).
  506. func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool {
  507. var (
  508. n *fetcherTreeNode
  509. header *types.Header
  510. td *big.Int
  511. )
  512. for i := len(headers) - 1; ; i-- {
  513. if i < 0 {
  514. if n == nil {
  515. // no more headers and nothing to match
  516. return true
  517. }
  518. // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching
  519. hash, number := header.ParentHash, header.Number.Uint64()-1
  520. td = f.chain.GetTd(hash, number)
  521. header = f.chain.GetHeader(hash, number)
  522. if header == nil || td == nil {
  523. log.Error("Missing parent of validated header", "hash", hash, "number", number)
  524. return false
  525. }
  526. } else {
  527. header = headers[i]
  528. td = tds[i]
  529. }
  530. hash := header.Hash()
  531. number := header.Number.Uint64()
  532. if n == nil {
  533. n = fp.nodeByHash[hash]
  534. }
  535. if n != nil {
  536. if n.td == nil {
  537. // node was unannounced
  538. if nn := fp.nodeByHash[hash]; nn != nil {
  539. // if there was already a node with the same hash, continue there and drop this one
  540. nn.children = append(nn.children, n.children...)
  541. n.children = nil
  542. fp.deleteNode(n)
  543. n = nn
  544. } else {
  545. n.hash = hash
  546. n.td = td
  547. fp.nodeByHash[hash] = n
  548. }
  549. }
  550. // check if it matches the header
  551. if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 {
  552. // peer has previously made an invalid announcement
  553. return false
  554. }
  555. if n.known {
  556. // we reached a known node that matched our expectations, return with success
  557. return true
  558. }
  559. n.known = true
  560. if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 {
  561. fp.confirmedTd = td
  562. fp.bestConfirmed = n
  563. }
  564. n = n.parent
  565. if n == nil {
  566. return true
  567. }
  568. }
  569. }
  570. }
  571. // checkSyncedHeaders updates peer's block tree after synchronisation by marking
  572. // downloaded headers as known. If none of the announced headers are found after
  573. // syncing, the peer is dropped.
  574. func (f *lightFetcher) checkSyncedHeaders(p *peer) {
  575. fp := f.peers[p]
  576. if fp == nil {
  577. p.Log().Debug("Unknown peer to check sync headers")
  578. return
  579. }
  580. n := fp.lastAnnounced
  581. var td *big.Int
  582. for n != nil {
  583. if td = f.chain.GetTd(n.hash, n.number); td != nil {
  584. break
  585. }
  586. n = n.parent
  587. }
  588. // now n is the latest downloaded header after syncing
  589. if n == nil {
  590. p.Log().Debug("Synchronisation failed")
  591. go f.pm.removePeer(p.id)
  592. } else {
  593. header := f.chain.GetHeader(n.hash, n.number)
  594. f.newHeaders([]*types.Header{header}, []*big.Int{td})
  595. }
  596. }
  597. // checkKnownNode checks if a block tree node is known (downloaded and validated)
  598. // If it was not known previously but found in the database, sets its known flag
  599. func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
  600. if n.known {
  601. return true
  602. }
  603. td := f.chain.GetTd(n.hash, n.number)
  604. if td == nil {
  605. return false
  606. }
  607. header := f.chain.GetHeader(n.hash, n.number)
  608. // check the availability of both header and td because reads are not protected by chain db mutex
  609. // Note: returning false is always safe here
  610. if header == nil {
  611. return false
  612. }
  613. fp := f.peers[p]
  614. if fp == nil {
  615. p.Log().Debug("Unknown peer to check known nodes")
  616. return false
  617. }
  618. if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
  619. p.Log().Debug("Inconsistent announcement")
  620. go f.pm.removePeer(p.id)
  621. }
  622. if fp.confirmedTd != nil {
  623. f.updateMaxConfirmedTd(fp.confirmedTd)
  624. }
  625. return n.known
  626. }
  627. // deleteNode deletes a node and its child subtrees from a peer's block tree
  628. func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) {
  629. if n.parent != nil {
  630. for i, nn := range n.parent.children {
  631. if nn == n {
  632. n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...)
  633. break
  634. }
  635. }
  636. }
  637. for {
  638. if n.td != nil {
  639. delete(fp.nodeByHash, n.hash)
  640. }
  641. fp.nodeCnt--
  642. if len(n.children) == 0 {
  643. return
  644. }
  645. for i, nn := range n.children {
  646. if i == 0 {
  647. n = nn
  648. } else {
  649. fp.deleteNode(nn)
  650. }
  651. }
  652. }
  653. }
  654. // updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td
  655. // than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values
  656. // and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated
  657. // both globally for all peers and also for each individual peer (meaning that the given peer has announced the head
  658. // and it has also been downloaded from any peer, either before or after the given announcement).
  659. // The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer,
  660. // pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed
  661. // the current global head).
  662. type updateStatsEntry struct {
  663. time mclock.AbsTime
  664. td *big.Int
  665. next *updateStatsEntry
  666. }
  667. // updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed,
  668. // adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have
  669. // already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics.
  670. // Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a
  671. // positive block delay value.
  672. func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) {
  673. if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 {
  674. f.maxConfirmedTd = td
  675. newEntry := &updateStatsEntry{
  676. time: mclock.Now(),
  677. td: td,
  678. }
  679. if f.lastUpdateStats != nil {
  680. f.lastUpdateStats.next = newEntry
  681. }
  682. f.lastUpdateStats = newEntry
  683. for p := range f.peers {
  684. f.checkUpdateStats(p, newEntry)
  685. }
  686. }
  687. }
  688. // checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it
  689. // has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the
  690. // block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed,
  691. // the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry
  692. // items are removed from the head of the linked list.
  693. // If a new entry has been added to the global tail, it is passed as a parameter here even though this function
  694. // assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil),
  695. // it can set the new head to newEntry.
  696. func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
  697. now := mclock.Now()
  698. fp := f.peers[p]
  699. if fp == nil {
  700. p.Log().Debug("Unknown peer to check update stats")
  701. return
  702. }
  703. if newEntry != nil && fp.firstUpdateStats == nil {
  704. fp.firstUpdateStats = newEntry
  705. }
  706. for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
  707. f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
  708. fp.firstUpdateStats = fp.firstUpdateStats.next
  709. }
  710. if fp.confirmedTd != nil {
  711. for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
  712. f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
  713. fp.firstUpdateStats = fp.firstUpdateStats.next
  714. }
  715. }
  716. }