fetcher_test.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package fetcher
  17. import (
  18. "errors"
  19. "math/big"
  20. "sync"
  21. "sync/atomic"
  22. "testing"
  23. "time"
  24. "github.com/ethereum/go-ethereum/common"
  25. "github.com/ethereum/go-ethereum/consensus/ethash"
  26. "github.com/ethereum/go-ethereum/core"
  27. "github.com/ethereum/go-ethereum/core/types"
  28. "github.com/ethereum/go-ethereum/crypto"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/params"
  31. )
  32. var (
  33. testdb = ethdb.NewMemDatabase()
  34. testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
  35. testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
  36. genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
  37. unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil)
  38. )
  39. // makeChain creates a chain of n blocks starting at and including parent.
  40. // the returned hash chain is ordered head->parent. In addition, every 3rd block
  41. // contains a transaction and every 5th an uncle to allow testing correct block
  42. // reassembly.
  43. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
  44. blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
  45. block.SetCoinbase(common.Address{seed})
  46. // If the block number is multiple of 3, send a bonus transaction to the miner
  47. if parent == genesis && i%3 == 0 {
  48. signer := types.MakeSigner(params.TestChainConfig, block.Number())
  49. tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
  50. if err != nil {
  51. panic(err)
  52. }
  53. block.AddTx(tx)
  54. }
  55. // If the block number is a multiple of 5, add a bonus uncle to the block
  56. if i%5 == 0 {
  57. block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
  58. }
  59. })
  60. hashes := make([]common.Hash, n+1)
  61. hashes[len(hashes)-1] = parent.Hash()
  62. blockm := make(map[common.Hash]*types.Block, n+1)
  63. blockm[parent.Hash()] = parent
  64. for i, b := range blocks {
  65. hashes[len(hashes)-i-2] = b.Hash()
  66. blockm[b.Hash()] = b
  67. }
  68. return hashes, blockm
  69. }
  70. // fetcherTester is a test simulator for mocking out local block chain.
  71. type fetcherTester struct {
  72. fetcher *Fetcher
  73. hashes []common.Hash // Hash chain belonging to the tester
  74. blocks map[common.Hash]*types.Block // Blocks belonging to the tester
  75. drops map[string]bool // Map of peers dropped by the fetcher
  76. lock sync.RWMutex
  77. }
  78. // newTester creates a new fetcher test mocker.
  79. func newTester() *fetcherTester {
  80. tester := &fetcherTester{
  81. hashes: []common.Hash{genesis.Hash()},
  82. blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
  83. drops: make(map[string]bool),
  84. }
  85. tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
  86. tester.fetcher.Start()
  87. return tester
  88. }
  89. // getBlock retrieves a block from the tester's block chain.
  90. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
  91. f.lock.RLock()
  92. defer f.lock.RUnlock()
  93. return f.blocks[hash]
  94. }
  95. // verifyHeader is a nop placeholder for the block header verification.
  96. func (f *fetcherTester) verifyHeader(header *types.Header) error {
  97. return nil
  98. }
  99. // broadcastBlock is a nop placeholder for the block broadcasting.
  100. func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
  101. }
  102. // chainHeight retrieves the current height (block number) of the chain.
  103. func (f *fetcherTester) chainHeight() uint64 {
  104. f.lock.RLock()
  105. defer f.lock.RUnlock()
  106. return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
  107. }
  108. // insertChain injects a new blocks into the simulated chain.
  109. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
  110. f.lock.Lock()
  111. defer f.lock.Unlock()
  112. for i, block := range blocks {
  113. // Make sure the parent in known
  114. if _, ok := f.blocks[block.ParentHash()]; !ok {
  115. return i, errors.New("unknown parent")
  116. }
  117. // Discard any new blocks if the same height already exists
  118. if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
  119. return i, nil
  120. }
  121. // Otherwise build our current chain
  122. f.hashes = append(f.hashes, block.Hash())
  123. f.blocks[block.Hash()] = block
  124. }
  125. return 0, nil
  126. }
  127. // dropPeer is an emulator for the peer removal, simply accumulating the various
  128. // peers dropped by the fetcher.
  129. func (f *fetcherTester) dropPeer(peer string) {
  130. f.lock.Lock()
  131. defer f.lock.Unlock()
  132. f.drops[peer] = true
  133. }
  134. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
  135. func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
  136. closure := make(map[common.Hash]*types.Block)
  137. for hash, block := range blocks {
  138. closure[hash] = block
  139. }
  140. // Create a function that return a header from the closure
  141. return func(hash common.Hash) error {
  142. // Gather the blocks to return
  143. headers := make([]*types.Header, 0, 1)
  144. if block, ok := closure[hash]; ok {
  145. headers = append(headers, block.Header())
  146. }
  147. // Return on a new thread
  148. go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
  149. return nil
  150. }
  151. }
  152. // makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
  153. func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
  154. closure := make(map[common.Hash]*types.Block)
  155. for hash, block := range blocks {
  156. closure[hash] = block
  157. }
  158. // Create a function that returns blocks from the closure
  159. return func(hashes []common.Hash) error {
  160. // Gather the block bodies to return
  161. transactions := make([][]*types.Transaction, 0, len(hashes))
  162. uncles := make([][]*types.Header, 0, len(hashes))
  163. for _, hash := range hashes {
  164. if block, ok := closure[hash]; ok {
  165. transactions = append(transactions, block.Transactions())
  166. uncles = append(uncles, block.Uncles())
  167. }
  168. }
  169. // Return on a new thread
  170. go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
  171. return nil
  172. }
  173. }
  174. // verifyFetchingEvent verifies that one single event arrive on a fetching channel.
  175. func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
  176. if arrive {
  177. select {
  178. case <-fetching:
  179. case <-time.After(time.Second):
  180. t.Fatalf("fetching timeout")
  181. }
  182. } else {
  183. select {
  184. case <-fetching:
  185. t.Fatalf("fetching invoked")
  186. case <-time.After(10 * time.Millisecond):
  187. }
  188. }
  189. }
  190. // verifyCompletingEvent verifies that one single event arrive on an completing channel.
  191. func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
  192. if arrive {
  193. select {
  194. case <-completing:
  195. case <-time.After(time.Second):
  196. t.Fatalf("completing timeout")
  197. }
  198. } else {
  199. select {
  200. case <-completing:
  201. t.Fatalf("completing invoked")
  202. case <-time.After(10 * time.Millisecond):
  203. }
  204. }
  205. }
  206. // verifyImportEvent verifies that one single event arrive on an import channel.
  207. func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) {
  208. if arrive {
  209. select {
  210. case <-imported:
  211. case <-time.After(time.Second):
  212. t.Fatalf("import timeout")
  213. }
  214. } else {
  215. select {
  216. case <-imported:
  217. t.Fatalf("import invoked")
  218. case <-time.After(10 * time.Millisecond):
  219. }
  220. }
  221. }
  222. // verifyImportCount verifies that exactly count number of events arrive on an
  223. // import hook channel.
  224. func verifyImportCount(t *testing.T, imported chan *types.Block, count int) {
  225. for i := 0; i < count; i++ {
  226. select {
  227. case <-imported:
  228. case <-time.After(time.Second):
  229. t.Fatalf("block %d: import timeout", i+1)
  230. }
  231. }
  232. verifyImportDone(t, imported)
  233. }
  234. // verifyImportDone verifies that no more events are arriving on an import channel.
  235. func verifyImportDone(t *testing.T, imported chan *types.Block) {
  236. select {
  237. case <-imported:
  238. t.Fatalf("extra block imported")
  239. case <-time.After(50 * time.Millisecond):
  240. }
  241. }
  242. // Tests that a fetcher accepts block announcements and initiates retrievals for
  243. // them, successfully importing into the local chain.
  244. func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
  245. func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
  246. func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
  247. func testSequentialAnnouncements(t *testing.T, protocol int) {
  248. // Create a chain of blocks to import
  249. targetBlocks := 4 * hashLimit
  250. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  251. tester := newTester()
  252. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  253. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  254. // Iteratively announce blocks until all are imported
  255. imported := make(chan *types.Block)
  256. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  257. for i := len(hashes) - 2; i >= 0; i-- {
  258. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  259. verifyImportEvent(t, imported, true)
  260. }
  261. verifyImportDone(t, imported)
  262. }
  263. // Tests that if blocks are announced by multiple peers (or even the same buggy
  264. // peer), they will only get downloaded at most once.
  265. func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
  266. func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
  267. func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
  268. func testConcurrentAnnouncements(t *testing.T, protocol int) {
  269. // Create a chain of blocks to import
  270. targetBlocks := 4 * hashLimit
  271. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  272. // Assemble a tester with a built in counter for the requests
  273. tester := newTester()
  274. firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
  275. firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
  276. secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
  277. secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
  278. counter := uint32(0)
  279. firstHeaderWrapper := func(hash common.Hash) error {
  280. atomic.AddUint32(&counter, 1)
  281. return firstHeaderFetcher(hash)
  282. }
  283. secondHeaderWrapper := func(hash common.Hash) error {
  284. atomic.AddUint32(&counter, 1)
  285. return secondHeaderFetcher(hash)
  286. }
  287. // Iteratively announce blocks until all are imported
  288. imported := make(chan *types.Block)
  289. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  290. for i := len(hashes) - 2; i >= 0; i-- {
  291. tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
  292. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  293. tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
  294. verifyImportEvent(t, imported, true)
  295. }
  296. verifyImportDone(t, imported)
  297. // Make sure no blocks were retrieved twice
  298. if int(counter) != targetBlocks {
  299. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
  300. }
  301. }
  302. // Tests that announcements arriving while a previous is being fetched still
  303. // results in a valid import.
  304. func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
  305. func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
  306. func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
  307. func testOverlappingAnnouncements(t *testing.T, protocol int) {
  308. // Create a chain of blocks to import
  309. targetBlocks := 4 * hashLimit
  310. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  311. tester := newTester()
  312. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  313. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  314. // Iteratively announce blocks, but overlap them continuously
  315. overlap := 16
  316. imported := make(chan *types.Block, len(hashes)-1)
  317. for i := 0; i < overlap; i++ {
  318. imported <- nil
  319. }
  320. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  321. for i := len(hashes) - 2; i >= 0; i-- {
  322. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  323. select {
  324. case <-imported:
  325. case <-time.After(time.Second):
  326. t.Fatalf("block %d: import timeout", len(hashes)-i)
  327. }
  328. }
  329. // Wait for all the imports to complete and check count
  330. verifyImportCount(t, imported, overlap)
  331. }
  332. // Tests that announces already being retrieved will not be duplicated.
  333. func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
  334. func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
  335. func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
  336. func testPendingDeduplication(t *testing.T, protocol int) {
  337. // Create a hash and corresponding block
  338. hashes, blocks := makeChain(1, 0, genesis)
  339. // Assemble a tester with a built in counter and delayed fetcher
  340. tester := newTester()
  341. headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
  342. bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
  343. delay := 50 * time.Millisecond
  344. counter := uint32(0)
  345. headerWrapper := func(hash common.Hash) error {
  346. atomic.AddUint32(&counter, 1)
  347. // Simulate a long running fetch
  348. go func() {
  349. time.Sleep(delay)
  350. headerFetcher(hash)
  351. }()
  352. return nil
  353. }
  354. // Announce the same block many times until it's fetched (wait for any pending ops)
  355. for tester.getBlock(hashes[0]) == nil {
  356. tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
  357. time.Sleep(time.Millisecond)
  358. }
  359. time.Sleep(delay)
  360. // Check that all blocks were imported and none fetched twice
  361. if imported := len(tester.blocks); imported != 2 {
  362. t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2)
  363. }
  364. if int(counter) != 1 {
  365. t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
  366. }
  367. }
  368. // Tests that announcements retrieved in a random order are cached and eventually
  369. // imported when all the gaps are filled in.
  370. func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
  371. func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
  372. func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
  373. func testRandomArrivalImport(t *testing.T, protocol int) {
  374. // Create a chain of blocks to import, and choose one to delay
  375. targetBlocks := maxQueueDist
  376. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  377. skip := targetBlocks / 2
  378. tester := newTester()
  379. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  380. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  381. // Iteratively announce blocks, skipping one entry
  382. imported := make(chan *types.Block, len(hashes)-1)
  383. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  384. for i := len(hashes) - 1; i >= 0; i-- {
  385. if i != skip {
  386. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  387. time.Sleep(time.Millisecond)
  388. }
  389. }
  390. // Finally announce the skipped entry and check full import
  391. tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  392. verifyImportCount(t, imported, len(hashes)-1)
  393. }
  394. // Tests that direct block enqueues (due to block propagation vs. hash announce)
  395. // are correctly schedule, filling and import queue gaps.
  396. func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
  397. func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
  398. func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
  399. func testQueueGapFill(t *testing.T, protocol int) {
  400. // Create a chain of blocks to import, and choose one to not announce at all
  401. targetBlocks := maxQueueDist
  402. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  403. skip := targetBlocks / 2
  404. tester := newTester()
  405. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  406. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  407. // Iteratively announce blocks, skipping one entry
  408. imported := make(chan *types.Block, len(hashes)-1)
  409. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  410. for i := len(hashes) - 1; i >= 0; i-- {
  411. if i != skip {
  412. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  413. time.Sleep(time.Millisecond)
  414. }
  415. }
  416. // Fill the missing block directly as if propagated
  417. tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
  418. verifyImportCount(t, imported, len(hashes)-1)
  419. }
  420. // Tests that blocks arriving from various sources (multiple propagations, hash
  421. // announces, etc) do not get scheduled for import multiple times.
  422. func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
  423. func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
  424. func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
  425. func testImportDeduplication(t *testing.T, protocol int) {
  426. // Create two blocks to import (one for duplication, the other for stalling)
  427. hashes, blocks := makeChain(2, 0, genesis)
  428. // Create the tester and wrap the importer with a counter
  429. tester := newTester()
  430. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  431. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  432. counter := uint32(0)
  433. tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
  434. atomic.AddUint32(&counter, uint32(len(blocks)))
  435. return tester.insertChain(blocks)
  436. }
  437. // Instrument the fetching and imported events
  438. fetching := make(chan []common.Hash)
  439. imported := make(chan *types.Block, len(hashes)-1)
  440. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  441. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  442. // Announce the duplicating block, wait for retrieval, and also propagate directly
  443. tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  444. <-fetching
  445. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  446. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  447. tester.fetcher.Enqueue("valid", blocks[hashes[0]])
  448. // Fill the missing block directly as if propagated, and check import uniqueness
  449. tester.fetcher.Enqueue("valid", blocks[hashes[1]])
  450. verifyImportCount(t, imported, 2)
  451. if counter != 2 {
  452. t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
  453. }
  454. }
  455. // Tests that blocks with numbers much lower or higher than out current head get
  456. // discarded to prevent wasting resources on useless blocks from faulty peers.
  457. func TestDistantPropagationDiscarding(t *testing.T) {
  458. // Create a long chain to import and define the discard boundaries
  459. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  460. head := hashes[len(hashes)/2]
  461. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  462. // Create a tester and simulate a head block being the middle of the above chain
  463. tester := newTester()
  464. tester.lock.Lock()
  465. tester.hashes = []common.Hash{head}
  466. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  467. tester.lock.Unlock()
  468. // Ensure that a block with a lower number than the threshold is discarded
  469. tester.fetcher.Enqueue("lower", blocks[hashes[low]])
  470. time.Sleep(10 * time.Millisecond)
  471. if !tester.fetcher.queue.Empty() {
  472. t.Fatalf("fetcher queued stale block")
  473. }
  474. // Ensure that a block with a higher number than the threshold is discarded
  475. tester.fetcher.Enqueue("higher", blocks[hashes[high]])
  476. time.Sleep(10 * time.Millisecond)
  477. if !tester.fetcher.queue.Empty() {
  478. t.Fatalf("fetcher queued future block")
  479. }
  480. }
  481. // Tests that announcements with numbers much lower or higher than out current
  482. // head get discarded to prevent wasting resources on useless blocks from faulty
  483. // peers.
  484. func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) }
  485. func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) }
  486. func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) }
  487. func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
  488. // Create a long chain to import and define the discard boundaries
  489. hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
  490. head := hashes[len(hashes)/2]
  491. low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
  492. // Create a tester and simulate a head block being the middle of the above chain
  493. tester := newTester()
  494. tester.lock.Lock()
  495. tester.hashes = []common.Hash{head}
  496. tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
  497. tester.lock.Unlock()
  498. headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
  499. bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
  500. fetching := make(chan struct{}, 2)
  501. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
  502. // Ensure that a block with a lower number than the threshold is discarded
  503. tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  504. select {
  505. case <-time.After(50 * time.Millisecond):
  506. case <-fetching:
  507. t.Fatalf("fetcher requested stale header")
  508. }
  509. // Ensure that a block with a higher number than the threshold is discarded
  510. tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  511. select {
  512. case <-time.After(50 * time.Millisecond):
  513. case <-fetching:
  514. t.Fatalf("fetcher requested future header")
  515. }
  516. }
  517. // Tests that peers announcing blocks with invalid numbers (i.e. not matching
  518. // the headers provided afterwards) get dropped as malicious.
  519. func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) }
  520. func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) }
  521. func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) }
  522. func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
  523. // Create a single block to import and check numbers against
  524. hashes, blocks := makeChain(1, 0, genesis)
  525. tester := newTester()
  526. badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
  527. badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
  528. imported := make(chan *types.Block)
  529. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  530. // Announce a block with a bad number, check for immediate drop
  531. tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
  532. verifyImportEvent(t, imported, false)
  533. tester.lock.RLock()
  534. dropped := tester.drops["bad"]
  535. tester.lock.RUnlock()
  536. if !dropped {
  537. t.Fatalf("peer with invalid numbered announcement not dropped")
  538. }
  539. goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
  540. goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
  541. // Make sure a good announcement passes without a drop
  542. tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
  543. verifyImportEvent(t, imported, true)
  544. tester.lock.RLock()
  545. dropped = tester.drops["good"]
  546. tester.lock.RUnlock()
  547. if dropped {
  548. t.Fatalf("peer with valid numbered announcement dropped")
  549. }
  550. verifyImportDone(t, imported)
  551. }
  552. // Tests that if a block is empty (i.e. header only), no body request should be
  553. // made, and instead the header should be assembled into a whole block in itself.
  554. func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
  555. func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
  556. func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }
  557. func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
  558. // Create a chain of blocks to import
  559. hashes, blocks := makeChain(32, 0, genesis)
  560. tester := newTester()
  561. headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  562. bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  563. // Add a monitoring hook for all internal events
  564. fetching := make(chan []common.Hash)
  565. tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
  566. completing := make(chan []common.Hash)
  567. tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
  568. imported := make(chan *types.Block)
  569. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  570. // Iteratively announce blocks until all are imported
  571. for i := len(hashes) - 2; i >= 0; i-- {
  572. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
  573. // All announces should fetch the header
  574. verifyFetchingEvent(t, fetching, true)
  575. // Only blocks with data contents should request bodies
  576. verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
  577. // Irrelevant of the construct, import should succeed
  578. verifyImportEvent(t, imported, true)
  579. }
  580. verifyImportDone(t, imported)
  581. }
  582. // Tests that a peer is unable to use unbounded memory with sending infinite
  583. // block announcements to a node, but that even in the face of such an attack,
  584. // the fetcher remains operational.
  585. func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
  586. func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
  587. func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
  588. func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
  589. // Create a tester with instrumented import hooks
  590. tester := newTester()
  591. imported, announces := make(chan *types.Block), int32(0)
  592. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  593. tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
  594. if added {
  595. atomic.AddInt32(&announces, 1)
  596. } else {
  597. atomic.AddInt32(&announces, -1)
  598. }
  599. }
  600. // Create a valid chain and an infinite junk chain
  601. targetBlocks := hashLimit + 2*maxQueueDist
  602. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  603. validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
  604. validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
  605. attack, _ := makeChain(targetBlocks, 0, unknownBlock)
  606. attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
  607. attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
  608. // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
  609. for i := 0; i < len(attack); i++ {
  610. if i < maxQueueDist {
  611. tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
  612. }
  613. tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
  614. }
  615. if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
  616. t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
  617. }
  618. // Wait for fetches to complete
  619. verifyImportCount(t, imported, maxQueueDist)
  620. // Feed the remaining valid hashes to ensure DOS protection state remains clean
  621. for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
  622. tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
  623. verifyImportEvent(t, imported, true)
  624. }
  625. verifyImportDone(t, imported)
  626. }
  627. // Tests that blocks sent to the fetcher (either through propagation or via hash
  628. // announces and retrievals) don't pile up indefinitely, exhausting available
  629. // system memory.
  630. func TestBlockMemoryExhaustionAttack(t *testing.T) {
  631. // Create a tester with instrumented import hooks
  632. tester := newTester()
  633. imported, enqueued := make(chan *types.Block), int32(0)
  634. tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
  635. tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
  636. if added {
  637. atomic.AddInt32(&enqueued, 1)
  638. } else {
  639. atomic.AddInt32(&enqueued, -1)
  640. }
  641. }
  642. // Create a valid chain and a batch of dangling (but in range) blocks
  643. targetBlocks := hashLimit + 2*maxQueueDist
  644. hashes, blocks := makeChain(targetBlocks, 0, genesis)
  645. attack := make(map[common.Hash]*types.Block)
  646. for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
  647. hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
  648. for _, hash := range hashes[:maxQueueDist-2] {
  649. attack[hash] = blocks[hash]
  650. }
  651. }
  652. // Try to feed all the attacker blocks make sure only a limited batch is accepted
  653. for _, block := range attack {
  654. tester.fetcher.Enqueue("attacker", block)
  655. }
  656. time.Sleep(200 * time.Millisecond)
  657. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
  658. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
  659. }
  660. // Queue up a batch of valid blocks, and check that a new peer is allowed to do so
  661. for i := 0; i < maxQueueDist-1; i++ {
  662. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
  663. }
  664. time.Sleep(100 * time.Millisecond)
  665. if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
  666. t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
  667. }
  668. // Insert the missing piece (and sanity check the import)
  669. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
  670. verifyImportCount(t, imported, maxQueueDist)
  671. // Insert the remaining blocks in chunks to ensure clean DOS protection
  672. for i := maxQueueDist; i < len(hashes)-1; i++ {
  673. tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
  674. verifyImportEvent(t, imported, true)
  675. }
  676. verifyImportDone(t, imported)
  677. }