plugeth/eth/fetcher/block_fetcher_test.go
ucwong 83e4c49e2b
trie : use trie.NewStackTrie instead of new(trie.Trie) ()
The PR makes use of the stacktrie, which is is more lenient on resource consumption, than the regular trie, in cases where we only need it for DeriveSha
2021-02-02 13:09:23 +01:00

882 lines
32 KiB
Go

// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package fetcher
import (
"errors"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
)
var (
testdb = rawdb.NewMemoryDatabase()
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
unknownBlock = types.NewBlock(&types.Header{GasLimit: params.GenesisGasLimit}, nil, nil, nil, trie.NewStackTrie(nil))
)
// makeChain creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
blocks, _ := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testdb, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// If the block number is multiple of 3, send a bonus transaction to the miner
if parent == genesis && i%3 == 0 {
signer := types.MakeSigner(params.TestChainConfig, block.Number())
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil), signer, testKey)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
// If the block number is a multiple of 5, add a bonus uncle to the block
if i%5 == 0 {
block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
}
})
hashes := make([]common.Hash, n+1)
hashes[len(hashes)-1] = parent.Hash()
blockm := make(map[common.Hash]*types.Block, n+1)
blockm[parent.Hash()] = parent
for i, b := range blocks {
hashes[len(hashes)-i-2] = b.Hash()
blockm[b.Hash()] = b
}
return hashes, blockm
}
// fetcherTester is a test simulator for mocking out local block chain.
type fetcherTester struct {
fetcher *BlockFetcher
hashes []common.Hash // Hash chain belonging to the tester
headers map[common.Hash]*types.Header // Headers belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
drops map[string]bool // Map of peers dropped by the fetcher
lock sync.RWMutex
}
// newTester creates a new fetcher test mocker.
func newTester(light bool) *fetcherTester {
tester := &fetcherTester{
hashes: []common.Hash{genesis.Hash()},
headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()},
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer)
tester.fetcher.Start()
return tester
}
// getHeader retrieves a header from the tester's block chain.
func (f *fetcherTester) getHeader(hash common.Hash) *types.Header {
f.lock.RLock()
defer f.lock.RUnlock()
return f.headers[hash]
}
// getBlock retrieves a block from the tester's block chain.
func (f *fetcherTester) getBlock(hash common.Hash) *types.Block {
f.lock.RLock()
defer f.lock.RUnlock()
return f.blocks[hash]
}
// verifyHeader is a nop placeholder for the block header verification.
func (f *fetcherTester) verifyHeader(header *types.Header) error {
return nil
}
// broadcastBlock is a nop placeholder for the block broadcasting.
func (f *fetcherTester) broadcastBlock(block *types.Block, propagate bool) {
}
// chainHeight retrieves the current height (block number) of the chain.
func (f *fetcherTester) chainHeight() uint64 {
f.lock.RLock()
defer f.lock.RUnlock()
if f.fetcher.light {
return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64()
}
return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
}
// insertChain injects a new headers into the simulated chain.
func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
f.lock.Lock()
defer f.lock.Unlock()
for i, header := range headers {
// Make sure the parent in known
if _, ok := f.headers[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
}
// Discard any new blocks if the same height already exists
if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() {
return i, nil
}
// Otherwise build our current chain
f.hashes = append(f.hashes, header.Hash())
f.headers[header.Hash()] = header
}
return 0, nil
}
// insertChain injects a new blocks into the simulated chain.
func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
f.lock.Lock()
defer f.lock.Unlock()
for i, block := range blocks {
// Make sure the parent in known
if _, ok := f.blocks[block.ParentHash()]; !ok {
return i, errors.New("unknown parent")
}
// Discard any new blocks if the same height already exists
if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
return i, nil
}
// Otherwise build our current chain
f.hashes = append(f.hashes, block.Hash())
f.blocks[block.Hash()] = block
}
return 0, nil
}
// dropPeer is an emulator for the peer removal, simply accumulating the various
// peers dropped by the fetcher.
func (f *fetcherTester) dropPeer(peer string) {
f.lock.Lock()
defer f.lock.Unlock()
f.drops[peer] = true
}
// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
func (f *fetcherTester) makeHeaderFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
}
// Create a function that return a header from the closure
return func(hash common.Hash) error {
// Gather the blocks to return
headers := make([]*types.Header, 0, 1)
if block, ok := closure[hash]; ok {
headers = append(headers, block.Header())
}
// Return on a new thread
go f.fetcher.FilterHeaders(peer, headers, time.Now().Add(drift))
return nil
}
}
// makeBodyFetcher retrieves a block body fetcher associated with a simulated peer.
func (f *fetcherTester) makeBodyFetcher(peer string, blocks map[common.Hash]*types.Block, drift time.Duration) bodyRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
}
// Create a function that returns blocks from the closure
return func(hashes []common.Hash) error {
// Gather the block bodies to return
transactions := make([][]*types.Transaction, 0, len(hashes))
uncles := make([][]*types.Header, 0, len(hashes))
for _, hash := range hashes {
if block, ok := closure[hash]; ok {
transactions = append(transactions, block.Transactions())
uncles = append(uncles, block.Uncles())
}
}
// Return on a new thread
go f.fetcher.FilterBodies(peer, transactions, uncles, time.Now().Add(drift))
return nil
}
}
// verifyFetchingEvent verifies that one single event arrive on a fetching channel.
func verifyFetchingEvent(t *testing.T, fetching chan []common.Hash, arrive bool) {
if arrive {
select {
case <-fetching:
case <-time.After(time.Second):
t.Fatalf("fetching timeout")
}
} else {
select {
case <-fetching:
t.Fatalf("fetching invoked")
case <-time.After(10 * time.Millisecond):
}
}
}
// verifyCompletingEvent verifies that one single event arrive on an completing channel.
func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive bool) {
if arrive {
select {
case <-completing:
case <-time.After(time.Second):
t.Fatalf("completing timeout")
}
} else {
select {
case <-completing:
t.Fatalf("completing invoked")
case <-time.After(10 * time.Millisecond):
}
}
}
// verifyImportEvent verifies that one single event arrive on an import channel.
func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) {
if arrive {
select {
case <-imported:
case <-time.After(time.Second):
t.Fatalf("import timeout")
}
} else {
select {
case <-imported:
t.Fatalf("import invoked")
case <-time.After(20 * time.Millisecond):
}
}
}
// verifyImportCount verifies that exactly count number of events arrive on an
// import hook channel.
func verifyImportCount(t *testing.T, imported chan interface{}, count int) {
for i := 0; i < count; i++ {
select {
case <-imported:
case <-time.After(time.Second):
t.Fatalf("block %d: import timeout", i+1)
}
}
verifyImportDone(t, imported)
}
// verifyImportDone verifies that no more events are arriving on an import channel.
func verifyImportDone(t *testing.T, imported chan interface{}) {
select {
case <-imported:
t.Fatalf("extra block imported")
case <-time.After(50 * time.Millisecond):
}
}
// verifyChainHeight verifies the chain height is as expected.
func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) {
if fetcher.chainHeight() != height {
t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height)
}
}
// Tests that a fetcher accepts block/header announcements and initiates retrievals
// for them, successfully importing into the local chain.
func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) }
func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) }
func testSequentialAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks until all are imported
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that if blocks are announced by multiple peers (or even the same buggy
// peer), they will only get downloaded at most once.
func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) }
func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) }
func testConcurrentAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
// Assemble a tester with a built in counter for the requests
tester := newTester(light)
firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack)
firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0)
secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack)
secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0)
counter := uint32(0)
firstHeaderWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1)
return firstHeaderFetcher(hash)
}
secondHeaderWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1)
return secondHeaderFetcher(hash)
}
// Iteratively announce blocks until all are imported
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher)
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), secondHeaderWrapper, secondBodyFetcher)
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
// Make sure no blocks were retrieved twice
if int(counter) != targetBlocks {
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks)
}
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that announcements arriving while a previous is being fetched still
// results in a valid import.
func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) }
func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) }
func testOverlappingAnnouncements(t *testing.T, light bool) {
// Create a chain of blocks to import
targetBlocks := 4 * hashLimit
hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, but overlap them continuously
overlap := 16
imported := make(chan interface{}, len(hashes)-1)
for i := 0; i < overlap; i++ {
imported <- nil
}
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select {
case <-imported:
case <-time.After(time.Second):
t.Fatalf("block %d: import timeout", len(hashes)-i)
}
}
// Wait for all the imports to complete and check count
verifyImportCount(t, imported, overlap)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that announces already being retrieved will not be duplicated.
func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) }
func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) }
func testPendingDeduplication(t *testing.T, light bool) {
// Create a hash and corresponding block
hashes, blocks := makeChain(1, 0, genesis)
// Assemble a tester with a built in counter and delayed fetcher
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0)
delay := 50 * time.Millisecond
counter := uint32(0)
headerWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1)
// Simulate a long running fetch
go func() {
time.Sleep(delay)
headerFetcher(hash)
}()
return nil
}
checkNonExist := func() bool {
return tester.getBlock(hashes[0]) == nil
}
if light {
checkNonExist = func() bool {
return tester.getHeader(hashes[0]) == nil
}
}
// Announce the same block many times until it's fetched (wait for any pending ops)
for checkNonExist() {
tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
time.Sleep(time.Millisecond)
}
time.Sleep(delay)
// Check that all blocks were imported and none fetched twice
if int(counter) != 1 {
t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1)
}
verifyChainHeight(t, tester, 1)
}
// Tests that announcements retrieved in a random order are cached and eventually
// imported when all the gaps are filled in.
func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) }
func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) }
func testRandomArrivalImport(t *testing.T, light bool) {
// Create a chain of blocks to import, and choose one to delay
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester(light)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
time.Sleep(time.Millisecond)
}
}
// Finally announce the skipped entry and check full import
tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportCount(t, imported, len(hashes)-1)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that direct block enqueues (due to block propagation vs. hash announce)
// are correctly schedule, filling and import queue gaps.
func TestQueueGapFill(t *testing.T) {
// Create a chain of blocks to import, and choose one to not announce at all
targetBlocks := maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
skip := targetBlocks / 2
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Iteratively announce blocks, skipping one entry
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
for i := len(hashes) - 1; i >= 0; i-- {
if i != skip {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
time.Sleep(time.Millisecond)
}
}
// Fill the missing block directly as if propagated
tester.fetcher.Enqueue("valid", blocks[hashes[skip]])
verifyImportCount(t, imported, len(hashes)-1)
verifyChainHeight(t, tester, uint64(len(hashes)-1))
}
// Tests that blocks arriving from various sources (multiple propagations, hash
// announces, etc) do not get scheduled for import multiple times.
func TestImportDeduplication(t *testing.T) {
// Create two blocks to import (one for duplication, the other for stalling)
hashes, blocks := makeChain(2, 0, genesis)
// Create the tester and wrap the importer with a counter
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
counter := uint32(0)
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
atomic.AddUint32(&counter, uint32(len(blocks)))
return tester.insertChain(blocks)
}
// Instrument the fetching and imported events
fetching := make(chan []common.Hash)
imported := make(chan interface{}, len(hashes)-1)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
// Announce the duplicating block, wait for retrieval, and also propagate directly
tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
<-fetching
tester.fetcher.Enqueue("valid", blocks[hashes[0]])
tester.fetcher.Enqueue("valid", blocks[hashes[0]])
tester.fetcher.Enqueue("valid", blocks[hashes[0]])
// Fill the missing block directly as if propagated, and check import uniqueness
tester.fetcher.Enqueue("valid", blocks[hashes[1]])
verifyImportCount(t, imported, 2)
if counter != 2 {
t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2)
}
}
// Tests that blocks with numbers much lower or higher than out current head get
// discarded to prevent wasting resources on useless blocks from faulty peers.
func TestDistantPropagationDiscarding(t *testing.T) {
// Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
head := hashes[len(hashes)/2]
low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester(false)
tester.lock.Lock()
tester.hashes = []common.Hash{head}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
tester.lock.Unlock()
// Ensure that a block with a lower number than the threshold is discarded
tester.fetcher.Enqueue("lower", blocks[hashes[low]])
time.Sleep(10 * time.Millisecond)
if !tester.fetcher.queue.Empty() {
t.Fatalf("fetcher queued stale block")
}
// Ensure that a block with a higher number than the threshold is discarded
tester.fetcher.Enqueue("higher", blocks[hashes[high]])
time.Sleep(10 * time.Millisecond)
if !tester.fetcher.queue.Empty() {
t.Fatalf("fetcher queued future block")
}
}
// Tests that announcements with numbers much lower or higher than out current
// head get discarded to prevent wasting resources on useless blocks from faulty
// peers.
func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) }
func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) }
func testDistantAnnouncementDiscarding(t *testing.T, light bool) {
// Create a long chain to import and define the discard boundaries
hashes, blocks := makeChain(3*maxQueueDist, 0, genesis)
head := hashes[len(hashes)/2]
low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1
// Create a tester and simulate a head block being the middle of the above chain
tester := newTester(light)
tester.lock.Lock()
tester.hashes = []common.Hash{head}
tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()}
tester.blocks = map[common.Hash]*types.Block{head: blocks[head]}
tester.lock.Unlock()
headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0)
fetching := make(chan struct{}, 2)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
// Ensure that a block with a lower number than the threshold is discarded
tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested stale header")
}
// Ensure that a block with a higher number than the threshold is discarded
tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select {
case <-time.After(50 * time.Millisecond):
case <-fetching:
t.Fatalf("fetcher requested future header")
}
}
// Tests that peers announcing blocks with invalid numbers (i.e. not matching
// the headers provided afterwards) get dropped as malicious.
func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) }
func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) }
func testInvalidNumberAnnouncement(t *testing.T, light bool) {
// Create a single block to import and check numbers against
hashes, blocks := makeChain(1, 0, genesis)
tester := newTester(light)
badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack)
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
t.Fatalf("Fetcher try to import empty header")
}
imported <- header
} else {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
}
// Announce a block with a bad number, check for immediate drop
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
verifyImportEvent(t, imported, false)
tester.lock.RLock()
dropped := tester.drops["bad"]
tester.lock.RUnlock()
if !dropped {
t.Fatalf("peer with invalid numbered announcement not dropped")
}
goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
// Make sure a good announcement passes without a drop
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
verifyImportEvent(t, imported, true)
tester.lock.RLock()
dropped = tester.drops["good"]
tester.lock.RUnlock()
if dropped {
t.Fatalf("peer with valid numbered announcement dropped")
}
verifyImportDone(t, imported)
}
// Tests that if a block is empty (i.e. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyBlockShortCircuit(t *testing.T) {
// Create a chain of blocks to import
hashes, blocks := makeChain(32, 0, genesis)
tester := newTester(false)
headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
// Add a monitoring hook for all internal events
fetching := make(chan []common.Hash)
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
completing := make(chan []common.Hash)
tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
imported := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if block == nil {
t.Fatalf("Fetcher try to import empty block")
}
imported <- block
}
// Iteratively announce blocks until all are imported
for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
// All announces should fetch the header
verifyFetchingEvent(t, fetching, true)
// Only blocks with data contents should request bodies
verifyCompletingEvent(t, completing, len(blocks[hashes[i]].Transactions()) > 0 || len(blocks[hashes[i]].Uncles()) > 0)
// Irrelevant of the construct, import should succeed
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}
// Tests that a peer is unable to use unbounded memory with sending infinite
// block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational.
func TestHashMemoryExhaustionAttack(t *testing.T) {
// Create a tester with instrumented import hooks
tester := newTester(false)
imported, announces := make(chan interface{}), int32(0)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) {
if added {
atomic.AddInt32(&announces, 1)
} else {
atomic.AddInt32(&announces, -1)
}
}
// Create a valid chain and an infinite junk chain
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
validHeaderFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack)
validBodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
attack, _ := makeChain(targetBlocks, 0, unknownBlock)
attackerHeaderFetcher := tester.makeHeaderFetcher("attacker", nil, -gatherSlack)
attackerBodyFetcher := tester.makeBodyFetcher("attacker", nil, 0)
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer
for i := 0; i < len(attack); i++ {
if i < maxQueueDist {
tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
}
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
}
if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
}
// Wait for fetches to complete
verifyImportCount(t, imported, maxQueueDist)
// Feed the remaining valid hashes to ensure DOS protection state remains clean
for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}
// Tests that blocks sent to the fetcher (either through propagation or via hash
// announces and retrievals) don't pile up indefinitely, exhausting available
// system memory.
func TestBlockMemoryExhaustionAttack(t *testing.T) {
// Create a tester with instrumented import hooks
tester := newTester(false)
imported, enqueued := make(chan interface{}), int32(0)
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block }
tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) {
if added {
atomic.AddInt32(&enqueued, 1)
} else {
atomic.AddInt32(&enqueued, -1)
}
}
// Create a valid chain and a batch of dangling (but in range) blocks
targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis)
attack := make(map[common.Hash]*types.Block)
for i := byte(0); len(attack) < blockLimit+2*maxQueueDist; i++ {
hashes, blocks := makeChain(maxQueueDist-1, i, unknownBlock)
for _, hash := range hashes[:maxQueueDist-2] {
attack[hash] = blocks[hash]
}
}
// Try to feed all the attacker blocks make sure only a limited batch is accepted
for _, block := range attack {
tester.fetcher.Enqueue("attacker", block)
}
time.Sleep(200 * time.Millisecond)
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit {
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit)
}
// Queue up a batch of valid blocks, and check that a new peer is allowed to do so
for i := 0; i < maxQueueDist-1; i++ {
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]])
}
time.Sleep(100 * time.Millisecond)
if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 {
t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1)
}
// Insert the missing piece (and sanity check the import)
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2]])
verifyImportCount(t, imported, maxQueueDist)
// Insert the remaining blocks in chunks to ensure clean DOS protection
for i := maxQueueDist; i < len(hashes)-1; i++ {
tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-2-i]])
verifyImportEvent(t, imported, true)
}
verifyImportDone(t, imported)
}