From af51dc4d637dbbb0d416032304f84d52d4f6d951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 29 Jun 2015 17:37:55 +0300 Subject: [PATCH] eth, eth/downloader: pass the eth protocol version through --- eth/downloader/downloader.go | 4 +- eth/downloader/downloader_test.go | 70 +++++++++++++++++-------------- eth/downloader/peer.go | 5 ++- eth/handler.go | 2 +- 4 files changed, 45 insertions(+), 36 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 39976aae1..1b31f5dbd 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -175,7 +175,7 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { // If the peer wants to send a banned hash, reject if d.banned.Has(head) { glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) @@ -183,7 +183,7 @@ func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFet } // Otherwise try to construct and register the peer glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getHashes, getBlocks)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7feca8782..0e9b58005 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -16,6 +16,11 @@ import ( "github.com/ethereum/go-ethereum/event" ) +const ( + eth60 = 60 + eth61 = 61 +) + var ( testdb, _ = ethdb.NewMemDatabase() genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) @@ -112,15 +117,15 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { } // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - return dl.newSlowPeer(id, hashes, blocks, 0) +func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, version, hashes, blocks, 0) } // newSlowPeer registers a new block download source into the downloader, with a // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) +func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -201,7 +206,7 @@ func TestSynchronisation(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer"); err != nil { @@ -232,7 +237,7 @@ func TestCancel(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Make sure canceling works with a pristine downloader tester.downloader.cancel() @@ -259,7 +264,7 @@ func TestThrottling(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Wrap the importer to allow stepping done := make(chan int) @@ -317,7 +322,7 @@ func TestMultiSynchronisation(t *testing.T) { tester := newTester() for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + tester.newPeer(id, eth60, hashes[i*blockCacheLimit:], blocks) } // Synchronise with the middle peer and make sure half of the blocks were retrieved id := fmt.Sprintf("peer #%d", targetPeers/2) @@ -347,8 +352,8 @@ func TestSlowSynchronisation(t *testing.T) { targetIODelay := time.Second hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester.newSlowPeer("fast", hashes, blocks, 0) - tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + tester.newSlowPeer("fast", eth60, hashes, blocks, 0) + tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay) // Try to sync with the peers (pull hashes from fast) start := time.Now() @@ -370,13 +375,14 @@ func TestSlowSynchronisation(t *testing.T) { func TestNonExistingParentAttack(t *testing.T) { tester := newTester() + // Forge a single-link chain with a forged header hashes, blocks := makeChain(1, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) wrongblock.Td = blocks[hashes[0]].Td hashes, blocks = makeChain(1, 0, wrongblock) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err == nil { @@ -401,8 +407,8 @@ func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? // Create a valid chain, but drop the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", hashes[:len(hashes)-1], blocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node errc := make(chan error) @@ -431,10 +437,10 @@ func TestNonExistingBlockAttack(t *testing.T) { // Create a valid chain, but forge the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) hashes[len(hashes)/2] = common.Hash{} - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errPeersUnavailable { @@ -453,7 +459,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Create a valid long chain, but reverse some hashes within hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) @@ -462,7 +468,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(hashes[2*blockCacheLimit:], chunk1) copy(hashes[blockCacheLimit:], chunk2) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errInvalidChain { @@ -489,8 +495,8 @@ func TestMadeupHashChainAttack(t *testing.T) { rand.Read(randomHashes[i][:]) } - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, randomHashes, nil) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errCrossCheckFailed { @@ -517,7 +523,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("attack", eth60, randomHashes, nil) if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -540,7 +546,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { } // Try and sync with the malicious node and check that it fails tester := newTester() - tester.newPeer("attack", gapped, blocks) + tester.newPeer("attack", eth60, gapped, blocks) if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -548,13 +554,13 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } -// tests that if one/multiple malicious peers try to feed a banned blockchain to +// Tests that if one/multiple malicious peers try to feed a banned blockchain to // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until its head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { @@ -565,8 +571,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { // Create the tester and ban the selected hash. tester := newTester() tester.downloader.banned.Add(forkHashes[fork-1]) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -586,7 +592,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", forkHashes, forkBlocks); err != errBannedHead { + if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { @@ -618,8 +624,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = 4 maxBannedHashes = 256 - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -664,7 +670,7 @@ func TestOverlappingDeliveryAttack(t *testing.T) { // Register an attacker that always returns non-requested blocks too tester := newTester() - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { @@ -712,7 +718,7 @@ func TestHashAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -744,7 +750,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index f36e133e4..7176cc06b 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -39,11 +39,13 @@ type peer struct { getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) + + version int // Eth protocol version number to switch strategies } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { +func newPeer(id string, version int, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { return &peer{ id: id, head: head, @@ -51,6 +53,7 @@ func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blo getHashes: getHashes, getBlocks: getBlocks, ignored: set.New(), + version: version, } } diff --git a/eth/handler.go b/eth/handler.go index 3705f5bb6..712d65220 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -181,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error { defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil { + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil { return err } // Propagate existing transactions. new transactions appearing