From 66d3dc8690e0aa551e7b35a17006a2135b51c9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 15:56:08 +0300 Subject: [PATCH 01/12] eth, eth/downloader: move peer removal into downloader --- eth/backend.go | 8 ++- eth/downloader/downloader.go | 87 +++++++++++++++++++++---------- eth/downloader/downloader_test.go | 32 ++++++------ eth/downloader/queue.go | 2 +- eth/handler.go | 4 +- eth/sync.go | 32 +----------- 6 files changed, 83 insertions(+), 82 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index d2ec0cc62..4ebf21811 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -193,7 +193,6 @@ type Ethereum struct { whisper *whisper.Whisper pow *ethash.Ethash protocolManager *ProtocolManager - downloader *downloader.Downloader SolcPath string solc *compiler.Solidity @@ -290,14 +289,13 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager) + eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) - - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) if config.Shh { eth.whisper = whisper.New() eth.shhVersionId = int(eth.whisper.Version()) @@ -447,7 +445,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio func (s *Ethereum) EthVersion() int { return s.ethVersionId } func (s *Ethereum) NetVersion() int { return s.netVersionId } func (s *Ethereum) ShhVersion() int { return s.shhVersionId } -func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader } +func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } // Start the ethereum func (s *Ethereum) Start() error { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f0a515d12..499b3a585 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -32,28 +32,32 @@ var ( var ( errLowTd = errors.New("peers TD is too low") - ErrBusy = errors.New("busy") + errBusy = errors.New("busy") errUnknownPeer = errors.New("peer is unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") - ErrStallingPeer = errors.New("peer is stalling") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") errBannedHead = errors.New("peer head hash already banned") errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - ErrEmptyHashSet = errors.New("empty hash set by peer") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") - ErrInvalidChain = errors.New("retrieved hash chain is invalid") - ErrCrossCheckFailed = errors.New("block cross-check failed") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") ) +// hashCheckFn is a callback type for verifying a hash's presence in the local chain. type hashCheckFn func(common.Hash) bool -type getBlockFn func(common.Hash) *types.Block -type chainInsertFn func(types.Blocks) (int, error) -type hashIterFn func() (common.Hash, error) + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) type blockPack struct { peerId string @@ -85,8 +89,9 @@ type Downloader struct { importLock sync.Mutex // Callbacks - hasBlock hashCheckFn - getBlock getBlockFn + hasBlock hashCheckFn // Checks if a block is present in the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + dropPeer peerDropFn // Retrieved the TD of our own chain // Status synchronising int32 @@ -107,7 +112,8 @@ type Block struct { OriginPeer string } -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { +// New creates a new downloader to fetch hashes and blocks from remote peers. +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ mux: mux, @@ -115,6 +121,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, + dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), @@ -183,19 +190,43 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise will select the peer and use it for synchronising. If an empty string is given +// Synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. +func (d *Downloader) Synchronise(id string, head common.Hash) { + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head) + + switch err := d.synchronise(id, head); err { + case nil: + glog.V(logger.Detail).Infof("Synchronisation completed") + + case errBusy: + glog.V(logger.Detail).Infof("Synchronisation already in progress") + + case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed: + glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) + d.dropPeer(id) + + case errPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) + + default: + glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) + } +} + +// synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) error { +func (d *Downloader) synchronise(id string, hash common.Hash) error { // Make sure only one goroutine is ever allowed past this point at once if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { - return ErrBusy + return errBusy } defer atomic.StoreInt32(&d.synchronising, 0) // If the head hash is banned, terminate immediately if d.banned.Has(hash) { - return ErrInvalidChain + return errInvalidChain } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { @@ -209,7 +240,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return ErrPendingQueue + return errPendingQueue } // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() @@ -342,7 +373,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) - return ErrEmptyHashSet + return errEmptyHashSet } for index, hash := range hashPack.hashes { if d.banned.Has(hash) { @@ -352,7 +383,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } - return ErrInvalidChain + return errInvalidChain } } // Determine if we're done fetching hashes (queue up all pending), and continue if not done @@ -369,12 +400,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) - return ErrBadPeer + return errBadPeer } if !done { // Check that the peer is not stalling the sync if len(inserts) < MinHashFetch { - return ErrStallingPeer + return errStallingPeer } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch @@ -408,7 +439,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { block := blockPack.blocks[0] if check, ok := d.checks[block.Hash()]; ok { if block.ParentHash() != check.parent { - return ErrCrossCheckFailed + return errCrossCheckFailed } delete(d.checks, block.Hash()) } @@ -418,7 +449,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { for hash, check := range d.checks { if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) - return ErrCrossCheckFailed + return errCrossCheckFailed } } @@ -438,7 +469,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (head == common.Hash{}) { - return ErrTimeout + return errTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. @@ -500,7 +531,7 @@ out: peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - case ErrInvalidChain: + case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort return err @@ -617,7 +648,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { return errCancelBlockFetch case <-timeout: - return ErrTimeout + return errTimeout case <-d.hashCh: // Out of bounds hashes received, ignore them diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5f10fb41f..5e79c10c9 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -73,7 +73,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types done: make(chan bool), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock) + downloader := New(&mux, tester.hasBlock, tester.getBlock, nil) tester.downloader = downloader return tester @@ -83,7 +83,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types // block until it returns func (dl *downloadTester) sync(peerId string, head common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, head) + return dl.downloader.synchronise(peerId, head) } // syncTake is starts synchronising with a remote peer, but concurrently it also @@ -415,8 +415,8 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, reverse, blocks) tester.newPeer("attack", big.NewInt(10000), reverse[0]) - if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync tester.hashes = hashes @@ -438,8 +438,8 @@ func TestMadeupHashChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, hashes, nil) tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } } @@ -455,8 +455,8 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -480,8 +480,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, gapped, blocks) tester.newPeer("attack", big.NewInt(10000), gapped[0]) - if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL @@ -514,8 +514,8 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, hashes, forges) tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL @@ -547,8 +547,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { tester.newPeer("attack", big.NewInt(10000), hashes[0]) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Check that the ban list grew with at least 1 new item, or all banned bans := tester.downloader.banned.Size() @@ -592,8 +592,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { tester.newPeer("attack", big.NewInt(10000), hashes[0]) for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned if tester.downloader.banned.Has(hashes[0]) { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 7abbd42fd..903f043eb 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -320,7 +320,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // If a requested block falls out of the range, the hash chain is invalid index := int(block.NumberU64()) - q.blockOffset if index >= len(q.blockCache) || index < 0 { - return ErrInvalidChain + return errInvalidChain } // Otherwise merge the block and mark the hash block q.blockCache[index] = &Block{ diff --git a/eth/handler.go b/eth/handler.go index f002727f3..ac7fb8fcf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -68,12 +68,11 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager { manager := &ProtocolManager{ eventMux: mux, txpool: txpool, chainman: chainman, - downloader: downloader, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), @@ -81,6 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), diff --git a/eth/sync.go b/eth/sync.go index 8fee21d7b..b127ca979 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/discover" @@ -332,33 +331,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if peer.Td().Cmp(pm.chainman.Td()) <= 0 { return } - // FIXME if we have the hash in our chain and the TD of the peer is - // much higher than ours, something is wrong with us or the peer. - // Check if the hash is on our own chain - head := peer.Head() - if pm.chainman.HasBlock(head) { - glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") - return - } - // Get the hashes from the peer (synchronously) - glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head) - - err := pm.downloader.Synchronise(peer.id, head) - switch err { - case nil: - glog.V(logger.Detail).Infof("Synchronisation completed") - - case downloader.ErrBusy: - glog.V(logger.Detail).Infof("Synchronisation already in progress") - - case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: - glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) - pm.removePeer(peer.id) - - case downloader.ErrPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - - default: - glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) - } + // Otherwise try to sync with the downloader + pm.downloader.Synchronise(peer.id, peer.Head()) } From 2937903299b9b965eb54a9d5031714d6a7af972e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 16:11:56 +0300 Subject: [PATCH 02/12] eth/downloader: remove uneeded testing functions --- eth/downloader/downloader_test.go | 9 ++------- eth/downloader/queue_test.go | 30 ------------------------------ 2 files changed, 2 insertions(+), 37 deletions(-) delete mode 100644 eth/downloader/queue_test.go diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5e79c10c9..e7dfbc70b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -214,18 +214,13 @@ func TestBlockTaking(t *testing.T) { // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) - blocks := createBlocksFromHashSet(createHashSet(hashes)) - tester := newTester(t, nil, nil) // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", hashes); err != errNoSyncActive { + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } - if err := tester.downloader.DeliverBlocks("bad peer", blocks); err != errNoSyncActive { + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) } } diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go deleted file mode 100644 index ee6141f71..000000000 --- a/eth/downloader/queue_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package downloader - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "gopkg.in/fatih/set.v0" -) - -func createHashSet(hashes []common.Hash) *set.Set { - hset := set.New() - - for _, hash := range hashes { - hset.Add(hash) - } - - return hset -} - -func createBlocksFromHashSet(hashes *set.Set) []*types.Block { - blocks := make([]*types.Block, hashes.Size()) - - var i int - hashes.Each(func(v interface{}) bool { - blocks[i] = createBlock(i, common.Hash{}, v.(common.Hash)) - i++ - return true - }) - - return blocks -} From 2dd6a62f67a217cc2186f04d7d99565f7782c79b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 17:14:45 +0300 Subject: [PATCH 03/12] eth/downloader: support individual peers in the test suite --- eth/downloader/downloader_test.go | 235 +++++++++++++++--------------- 1 file changed, 118 insertions(+), 117 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e7dfbc70b..6269ed87c 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -16,6 +16,8 @@ var ( knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} bannedHash = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5} + + genesis = createBlock(1, common.Hash{}, knownHash) ) func createHashes(start, amount int) (hashes []common.Hash) { @@ -51,26 +53,20 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { type downloadTester struct { downloader *Downloader - hashes []common.Hash // Chain of hashes simulating - blocks map[common.Hash]*types.Block // Blocks associated with the hashes - chain []common.Hash // Block-chain being constructed + ownHashes []common.Hash // Hash chain belonging to the tester + ownBlocks map[common.Hash]*types.Block // Blocks belonging to the tester + peerHashes map[string][]common.Hash // Hash chain belonging to different test peers + peerBlocks map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers maxHashFetch int // Overrides the maximum number of retrieved hashes - - t *testing.T - done chan bool - activePeerId string } -func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { +func newTester() *downloadTester { tester := &downloadTester{ - t: t, - - hashes: hashes, - blocks: blocks, - chain: []common.Hash{knownHash}, - - done: make(chan bool), + ownHashes: []common.Hash{knownHash}, + ownBlocks: map[common.Hash]*types.Block{knownHash: genesis}, + peerHashes: make(map[string][]common.Hash), + peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux downloader := New(&mux, tester.hasBlock, tester.getBlock, nil) @@ -79,13 +75,6 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types return tester } -// sync is a simple wrapper around the downloader to start synchronisation and -// block until it returns -func (dl *downloadTester) sync(peerId string, head common.Hash) error { - dl.activePeerId = peerId - return dl.downloader.synchronise(peerId, head) -} - // syncTake is starts synchronising with a remote peer, but concurrently it also // starts fetching blocks that the downloader retrieved. IT blocks until both go // routines terminate. @@ -102,12 +91,17 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e time.Sleep(time.Millisecond) } // Take a batch of blocks and accumulate - took = append(took, dl.downloader.TakeBlocks()...) + blocks := dl.downloader.TakeBlocks() + for _, block := range blocks { + dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash()) + dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock + } + took = append(took, blocks...) } done <- struct{}{} }() // Start the downloading, sync the taker and return - err := dl.sync(peerId, head) + err := dl.downloader.synchronise(peerId, head) done <- struct{}{} <-done @@ -115,63 +109,74 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e return took, err } +// hasBlock checks if a block is present in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { - for _, h := range dl.chain { - if h == hash { - return true - } - } - return false + return dl.getBlock(hash) != nil } +// getBlock retrieves a block from the testers canonical chain. func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { - return dl.blocks[knownHash] + return dl.ownBlocks[hash] } -// getHashes retrieves a batch of hashes for reconstructing the chain. -func (dl *downloadTester) getHashes(head common.Hash) error { - limit := MaxHashFetch - if dl.maxHashFetch > 0 { - limit = dl.maxHashFetch +// newPeer registers a new block download source into the downloader. +func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) + if err == nil { + // Assign the owned hashes and blocks to the peer + dl.peerHashes[id] = hashes + dl.peerBlocks[id] = blocks } - // Gather the next batch of hashes - hashes := make([]common.Hash, 0, limit) - for i, hash := range dl.hashes { - if hash == head { - i++ - for len(hashes) < cap(hashes) && i < len(dl.hashes) { - hashes = append(hashes, dl.hashes[i]) + return err +} + +// peerGetBlocksFn constructs a getHashes function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error { + return func(head common.Hash) error { + limit := MaxHashFetch + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i, hash := range hashes { + if hash == head { i++ - } - break - } - } - // Delay delivery a bit to allow attacks to unfold - id := dl.activePeerId - go func() { - time.Sleep(time.Millisecond) - dl.downloader.DeliverHashes(id, hashes) - }() - return nil -} - -func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error { - return func(hashes []common.Hash) error { - blocks := make([]*types.Block, 0, len(hashes)) - for _, hash := range hashes { - if block, ok := dl.blocks[hash]; ok { - blocks = append(blocks, block) + for len(result) < cap(result) && i < len(hashes) { + result = append(result, hashes[i]) + i++ + } + break } } - go dl.downloader.DeliverBlocks(id, blocks) - + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() return nil } } -// newPeer registers a new block download source into the syncer. -func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) error { - return dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id)) +// peerGetBlocksFn constructs a getBlocks function associated with a particular +// peer in the download tester. The returned function can be used to retrieve +// batches of blocks from the particularly requested peer. +func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error { + return func(hashes []common.Hash) error { + blocks := dl.peerBlocks[id] + result := make([]*types.Block, 0, len(hashes)) + for _, hash := range hashes { + if block, ok := blocks[hash]; ok { + result = append(result, block) + } + } + go dl.downloader.DeliverBlocks(id, result) + + return nil + } } // Tests that simple synchronization, without throttling from a good peer works. @@ -181,11 +186,11 @@ func TestSynchronisation(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { @@ -200,11 +205,11 @@ func TestBlockTaking(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and test block retrieval - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { @@ -214,7 +219,7 @@ func TestBlockTaking(t *testing.T) { // Tests that an inactive downloader will not accept incoming hashes and blocks. func TestInactiveDownloader(t *testing.T) { - tester := newTester(t, nil, nil) + tester := newTester() // Check that neither hashes nor blocks are accepted if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { @@ -232,11 +237,11 @@ func TestCancel(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer", hashes[0]); err != nil { + if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if !tester.downloader.Cancel() { @@ -260,13 +265,13 @@ func TestThrottling(t *testing.T) { hashes := createHashes(0, targetBlocks) blocks := createBlocksFromHashes(hashes) - tester := newTester(t, hashes, blocks) - tester.newPeer("peer", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("peer", hashes, blocks) // Start a synchronisation concurrently errc := make(chan error) go func() { - errc <- tester.sync("peer", hashes[0]) + errc <- tester.downloader.synchronise("peer", hashes[0]) }() // Iteratively take some blocks, always checking the retrieval count for total := 0; total < targetBlocks; { @@ -303,9 +308,9 @@ func TestNonExistingParentAttack(t *testing.T) { forged.ParentHeaderHash = unknownHash // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != nil { + tester := newTester() + tester.newPeer("attack", hashes, blocks) + if err := tester.downloader.synchronise("attack", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs := tester.downloader.TakeBlocks() @@ -319,8 +324,8 @@ func TestNonExistingParentAttack(t *testing.T) { // Reconstruct a valid chain, and try to synchronize with it forged.ParentHeaderHash = knownHash - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs = tester.downloader.TakeBlocks() @@ -341,12 +346,12 @@ func TestRepeatingHashAttack(t *testing.T) { forged := hashes[:len(hashes)-1] // Try and sync with the malicious node - tester := newTester(t, forged, blocks) - tester.newPeer("attack", big.NewInt(10000), forged[0]) + tester := newTester() + tester.newPeer("attack", forged, blocks) errc := make(chan error) go func() { - errc <- tester.sync("attack", hashes[0]) + errc <- tester.downloader.synchronise("attack", hashes[0]) }() // Make sure that syncing returns and does so with a failure @@ -359,9 +364,8 @@ func TestRepeatingHashAttack(t *testing.T) { } } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -377,15 +381,15 @@ func TestNonExistingBlockAttack(t *testing.T) { hashes[len(hashes)/2] = unknownHash // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, blocks) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable { + tester := newTester() + tester.newPeer("attack", hashes, blocks) + if err := tester.downloader.synchronise("attack", hashes[0]); err != errPeersUnavailable { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) } // Ensure that a valid chain can still pass sync hashes[len(hashes)/2] = origin - tester.newPeer("valid", big.NewInt(20000), hashes[0]) - if err := tester.sync("valid", hashes[0]); err != nil { + tester.newPeer("valid", hashes, blocks) + if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -408,14 +412,13 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(reverse[blockCacheLimit:], chunk2) // Try and sync with the malicious node and check that it fails - tester := newTester(t, reverse, blocks) - tester.newPeer("attack", big.NewInt(10000), reverse[0]) + tester := newTester() + tester.newPeer("attack", reverse, blocks) if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -431,8 +434,8 @@ func TestMadeupHashChainAttack(t *testing.T) { hashes := createHashes(0, 1024*blockCacheLimit) // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, nil) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("attack", hashes, nil) if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -445,11 +448,11 @@ func TestMadeupHashChainAttack(t *testing.T) { func TestMadeupHashChainDrippingAttack(t *testing.T) { // Create a random chain of hashes to drip hashes := createHashes(0, 16*blockCacheLimit) - tester := newTester(t, hashes, nil) + tester := newTester() // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, nil) if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -473,8 +476,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { gapped[i] = hashes[2*i] } // Try and sync with the malicious node and check that it fails - tester := newTester(t, gapped, blocks) - tester.newPeer("attack", big.NewInt(10000), gapped[0]) + tester := newTester() + tester.newPeer("attack", gapped, blocks) if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -482,8 +485,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.hashes = hashes - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -507,8 +509,8 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { block.ParentHeaderHash = hash // Simulate pointing to already known hash } // Try and sync with the malicious node and check that it fails - tester := newTester(t, hashes, forges) - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester := newTester() + tester.newPeer("attack", hashes, forges) if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -516,8 +518,7 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.blocks = blocks - tester.newPeer("valid", big.NewInt(20000), hashes[0]) + tester.newPeer("valid", hashes, blocks) if _, err := tester.syncTake("valid", hashes[0]); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } @@ -534,12 +535,12 @@ func TestBannedChainStarvationAttack(t *testing.T) { blocks := createBlocksFromHashes(hashes) // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) + tester := newTester() tester.downloader.banned.Add(bannedHash) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, blocks) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { @@ -556,7 +557,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", big.NewInt(10000), hashes[0]); err != errBannedHead { + if err := tester.newPeer("new attacker", hashes, blocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("net attacker"); peer != nil { @@ -579,12 +580,12 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { blocks := createBlocksFromHashes(hashes) // Create the tester and ban the selected hash - tester := newTester(t, hashes, blocks) + tester := newTester() tester.downloader.banned.Add(bannedHash) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", big.NewInt(10000), hashes[0]) + tester.newPeer("attack", hashes, blocks) for { // Try to sync with the attacker, check hash chain failure if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { From faae8b7dd844f4faa2b9e2c8ce76246341ed7357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 18:12:51 +0300 Subject: [PATCH 04/12] eth: fix an accidental test compile error --- eth/protocol_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/eth/protocol_test.go b/eth/protocol_test.go index bbea9fc11..69d487c71 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" @@ -168,8 +167,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol db, _ = ethdb.NewMemDatabase() chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em) txpool = &fakeTxPool{added: txAdded} - dl = downloader.New(em, chain.HasBlock, chain.GetBlock) - pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain, dl) + pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, chain) ) pm.Start() return pm From 80833f813715cb151bbf165f462e38930fc4fccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 18:13:13 +0300 Subject: [PATCH 05/12] eth/downloader: instreument and test the sync peer drop --- eth/downloader/downloader.go | 14 ++++--- eth/downloader/downloader_test.go | 61 +++++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 499b3a585..88ceeb5ac 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -31,7 +31,6 @@ var ( ) var ( - errLowTd = errors.New("peers TD is too low") errBusy = errors.New("busy") errUnknownPeer = errors.New("peer is unknown or unhealthy") errBadPeer = errors.New("action from bad peer ignored") @@ -94,8 +93,9 @@ type Downloader struct { dropPeer peerDropFn // Retrieved the TD of our own chain // Status - synchronising int32 - notified int32 + synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing + synchronising int32 + notified int32 // Channels newPeerCh chan *peer @@ -202,7 +202,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash) { case errBusy: glog.V(logger.Detail).Infof("Synchronisation already in progress") - case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed: + case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed: glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) d.dropPeer(id) @@ -218,6 +218,10 @@ func (d *Downloader) Synchronise(id string, head common.Hash) { // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous func (d *Downloader) synchronise(id string, hash common.Hash) error { + // Mock out the synchonisation if testing + if d.synchroniseMock != nil { + return d.synchroniseMock(id, hash) + } // Make sure only one goroutine is ever allowed past this point at once if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { return errBusy @@ -226,7 +230,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { // If the head hash is banned, terminate immediately if d.banned.Has(hash) { - return errInvalidChain + return errBannedHead } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6269ed87c..c328e7d9a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -2,6 +2,7 @@ package downloader import ( "encoding/binary" + "fmt" "math/big" "testing" "time" @@ -69,7 +70,7 @@ func newTester() *downloadTester { peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock, nil) + downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer) tester.downloader = downloader return tester @@ -130,6 +131,14 @@ func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[co return err } +// dropPeer simulates a hard peer removal from the connection pool. +func (dl *downloadTester) dropPeer(id string) { + delete(dl.peerHashes, id) + delete(dl.peerBlocks, id) + + dl.downloader.UnregisterPeer(id) +} + // peerGetBlocksFn constructs a getHashes function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. @@ -544,14 +553,14 @@ func TestBannedChainStarvationAttack(t *testing.T) { for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead { + break + } t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Check that the ban list grew with at least 1 new item, or all banned bans := tester.downloader.banned.Size() if bans < banned+1 { - if tester.downloader.banned.Has(hashes[0]) { - break - } t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1) } banned = bans @@ -606,3 +615,47 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { } } } + +// Tests that misbehaving peers are disconnected, whilst behaving ones are not. +func TestAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual errors + tests := []struct { + result error + drop bool + }{ + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + } + // Run the tests and check disconnection status + tester := newTester() + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, []common.Hash{knownHash}, nil); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peerHashes[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Simulate a synchronisation and check the required result + tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } + + tester.downloader.Synchronise(id, knownHash) + if _, ok := tester.peerHashes[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) + } + } +} From 0fc71877a7d7a46f35147f753cba0de7b937c77a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 20:22:40 +0300 Subject: [PATCH 06/12] eth/downloader: add valid peer during attacks (check interference) --- eth/downloader/downloader_test.go | 247 ++++++++++++++++++------------ 1 file changed, 147 insertions(+), 100 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index c328e7d9a..5b85f01fb 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -14,23 +14,29 @@ import ( ) var ( - knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} - unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} - bannedHash = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5} + knownHash = common.Hash{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} + unknownHash = common.Hash{2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2} + bannedHash = common.Hash{3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3} genesis = createBlock(1, common.Hash{}, knownHash) ) -func createHashes(start, amount int) (hashes []common.Hash) { - hashes = make([]common.Hash, amount+1) - hashes[len(hashes)-1] = knownHash +// idCounter is used by the createHashes method the generate deterministic but unique hashes +var idCounter = int64(2) // #1 is the genesis block - for i := range hashes[:len(hashes)-1] { - binary.BigEndian.PutUint64(hashes[i][:8], uint64(start+i+2)) +// createHashes generates a batch of hashes rooted at a specific point in the chain. +func createHashes(amount int, root common.Hash) (hashes []common.Hash) { + hashes = make([]common.Hash, amount+1) + hashes[len(hashes)-1] = root + + for i := 0; i < len(hashes)-1; i++ { + binary.BigEndian.PutUint64(hashes[i][:8], uint64(idCounter)) + idCounter++ } return } +// createBlock assembles a new block at the given chain height. func createBlock(i int, parent, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) @@ -39,6 +45,11 @@ func createBlock(i int, parent, hash common.Hash) *types.Block { return block } +// copyBlock makes a deep copy of a block suitable for local modifications. +func copyBlock(block *types.Block) *types.Block { + return createBlock(int(block.Number().Int64()), block.ParentHeaderHash, block.HeaderHash) +} + func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { blocks := make(map[common.Hash]*types.Block) for i := 0; i < len(hashes); i++ { @@ -76,10 +87,15 @@ func newTester() *downloadTester { return tester } +// sync starts synchronizing with a remote peer, blocking until it completes. +func (dl *downloadTester) sync(id string) error { + return dl.downloader.synchronise(id, dl.peerHashes[id][0]) +} + // syncTake is starts synchronising with a remote peer, but concurrently it also // starts fetching blocks that the downloader retrieved. IT blocks until both go // routines terminate. -func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) { +func (dl *downloadTester) syncTake(id string) ([]*Block, error) { // Start a block collector to take blocks as they become available done := make(chan struct{}) took := []*Block{} @@ -102,7 +118,7 @@ func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, e done <- struct{}{} }() // Start the downloading, sync the taker and return - err := dl.downloader.synchronise(peerId, head) + err := dl.sync(id) done <- struct{}{} <-done @@ -124,9 +140,14 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) if err == nil { - // Assign the owned hashes and blocks to the peer - dl.peerHashes[id] = hashes - dl.peerBlocks[id] = blocks + // Assign the owned hashes and blocks to the peer (deep copy) + dl.peerHashes[id] = make([]common.Hash, len(hashes)) + copy(dl.peerHashes[id], hashes) + + dl.peerBlocks[id] = make(map[common.Hash]*types.Block) + for hash, block := range blocks { + dl.peerBlocks[id][hash] = copyBlock(block) + } } return err } @@ -192,14 +213,14 @@ func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error { func TestSynchronisation(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) tester := newTester() tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved - if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { + if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { @@ -211,14 +232,14 @@ func TestSynchronisation(t *testing.T) { func TestBlockTaking(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) tester := newTester() tester.newPeer("peer", hashes, blocks) // Synchronise with the peer and test block retrieval - if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { + if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { @@ -243,14 +264,14 @@ func TestInactiveDownloader(t *testing.T) { func TestCancel(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) tester := newTester() tester.newPeer("peer", hashes, blocks) // Synchronise with the peer, but cancel afterwards - if err := tester.downloader.synchronise("peer", hashes[0]); err != nil { + if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } if !tester.downloader.Cancel() { @@ -271,7 +292,7 @@ func TestCancel(t *testing.T) { func TestThrottling(t *testing.T) { // Create a long block chain to download and the tester targetBlocks := 8 * blockCacheLimit - hashes := createHashes(0, targetBlocks) + hashes := createHashes(targetBlocks, knownHash) blocks := createBlocksFromHashes(hashes) tester := newTester() @@ -280,7 +301,7 @@ func TestThrottling(t *testing.T) { // Start a synchronisation concurrently errc := make(chan error) go func() { - errc <- tester.downloader.synchronise("peer", hashes[0]) + errc <- tester.sync("peer") }() // Iteratively take some blocks, always checking the retrieval count for total := 0; total < targetBlocks; { @@ -309,17 +330,20 @@ func TestThrottling(t *testing.T) { // Tests that if a peer returns an invalid chain with a block pointing to a non- // existing parent, it is correctly detected and handled. func TestNonExistingParentAttack(t *testing.T) { - // Forge a single-link chain with a forged header - hashes := createHashes(0, 1) - blocks := createBlocksFromHashes(hashes) + tester := newTester() - forged := blocks[hashes[0]] - forged.ParentHeaderHash = unknownHash + // Forge a single-link chain with a forged header + hashes := createHashes(1, knownHash) + blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) + + hashes = createHashes(1, knownHash) + blocks = createBlocksFromHashes(hashes) + blocks[hashes[0]].ParentHeaderHash = unknownHash + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", hashes, blocks) - if err := tester.downloader.synchronise("attack", hashes[0]); err != nil { + if err := tester.sync("attack"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs := tester.downloader.TakeBlocks() @@ -331,10 +355,8 @@ func TestNonExistingParentAttack(t *testing.T) { } tester.downloader.Cancel() - // Reconstruct a valid chain, and try to synchronize with it - forged.ParentHeaderHash = knownHash - tester.newPeer("valid", hashes, blocks) - if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { + // Try to synchronize with the valid chain and make sure it succeeds + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } bs = tester.downloader.TakeBlocks() @@ -348,21 +370,20 @@ func TestNonExistingParentAttack(t *testing.T) { // Tests that if a malicious peers keeps sending us repeating hashes, we don't // loop indefinitely. -func TestRepeatingHashAttack(t *testing.T) { +func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? + tester := newTester() + // Create a valid chain, but drop the last link - hashes := createHashes(0, blockCacheLimit) + hashes := createHashes(blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - forged := hashes[:len(hashes)-1] + tester.newPeer("valid", hashes, blocks) + tester.newPeer("attack", hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node - tester := newTester() - tester.newPeer("attack", forged, blocks) - errc := make(chan error) go func() { - errc <- tester.downloader.synchronise("attack", hashes[0]) + errc <- tester.sync("attack") }() - // Make sure that syncing returns and does so with a failure select { case <-time.After(time.Second): @@ -373,8 +394,7 @@ func TestRepeatingHashAttack(t *testing.T) { } } // Ensure that a valid chain can still pass sync - tester.newPeer("valid", hashes, blocks) - if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -382,23 +402,22 @@ func TestRepeatingHashAttack(t *testing.T) { // Tests that if a malicious peers returns a non-existent block hash, it should // eventually time out and the sync reattempted. func TestNonExistingBlockAttack(t *testing.T) { + tester := newTester() + // Create a valid chain, but forge the last link - hashes := createHashes(0, blockCacheLimit) + hashes := createHashes(blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - origin := hashes[len(hashes)/2] + tester.newPeer("valid", hashes, blocks) hashes[len(hashes)/2] = unknownHash + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", hashes, blocks) - if err := tester.downloader.synchronise("attack", hashes[0]); err != errPeersUnavailable { + if err := tester.sync("attack"); err != errPeersUnavailable { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable) } // Ensure that a valid chain can still pass sync - hashes[len(hashes)/2] = origin - tester.newPeer("valid", hashes, blocks) - if err := tester.downloader.synchronise("valid", hashes[0]); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -406,29 +425,28 @@ func TestNonExistingBlockAttack(t *testing.T) { // Tests that if a malicious peer is returning hashes in a weird order, that the // sync throttler doesn't choke on them waiting for the valid blocks. func TestInvalidHashOrderAttack(t *testing.T) { + tester := newTester() + // Create a valid long chain, but reverse some hashes within - hashes := createHashes(0, 4*blockCacheLimit) + hashes := createHashes(4*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit]) copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit]) - reverse := make([]common.Hash, len(hashes)) - copy(reverse, hashes) - copy(reverse[2*blockCacheLimit:], chunk1) - copy(reverse[blockCacheLimit:], chunk2) + copy(hashes[2*blockCacheLimit:], chunk1) + copy(hashes[blockCacheLimit:], chunk2) + tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", reverse, blocks) - if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain { + if _, err := tester.syncTake("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - tester.newPeer("valid", hashes, blocks) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + if _, err := tester.syncTake("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -436,18 +454,25 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Tests that if a malicious peer makes up a random hash chain and tries to push // indefinitely, it actually gets caught with it. func TestMadeupHashChainAttack(t *testing.T) { + tester := newTester() blockSoftTTL = 100 * time.Millisecond crossCheckCycle = 25 * time.Millisecond // Create a long chain of hashes without backing blocks - hashes := createHashes(0, 1024*blockCacheLimit) + hashes := createHashes(4*blockCacheLimit, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester.newPeer("valid", hashes, blocks) + tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil) // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", hashes, nil) - if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } + // Ensure that a valid chain can still pass sync + if _, err := tester.syncTake("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } // Tests that if a malicious peer makes up a random hash chain, and tries to push @@ -456,13 +481,13 @@ func TestMadeupHashChainAttack(t *testing.T) { // one by one prevents reliable block/parent verification. func TestMadeupHashChainDrippingAttack(t *testing.T) { // Create a random chain of hashes to drip - hashes := createHashes(0, 16*blockCacheLimit) + hashes := createHashes(16*blockCacheLimit, knownHash) tester := newTester() // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 tester.newPeer("attack", hashes, nil) - if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer { + if _, err := tester.syncTake("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -477,7 +502,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 16*blockCacheLimit) + hashes := createHashes(16*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) gapped := make([]common.Hash, len(hashes)/2) @@ -487,7 +512,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester() tester.newPeer("attack", gapped, blocks) - if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed { + if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync @@ -495,7 +520,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = defaultCrossCheckCycle tester.newPeer("valid", hashes, blocks) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + if _, err := tester.syncTake("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -504,6 +529,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { // attacker make up a valid hashes for random blocks, but also forges the block // parents to point to existing hashes. func TestMadeupParentBlockChainAttack(t *testing.T) { + tester := newTester() + defaultBlockTTL := blockSoftTTL defaultCrossCheckCycle := crossCheckCycle @@ -511,24 +538,24 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { crossCheckCycle = 25 * time.Millisecond // Create a long chain of blocks and simulate an invalid chain by dropping every second - hashes := createHashes(0, 16*blockCacheLimit) + hashes := createHashes(16*blockCacheLimit, knownHash) blocks := createBlocksFromHashes(hashes) - forges := createBlocksFromHashes(hashes) - for hash, block := range forges { - block.ParentHeaderHash = hash // Simulate pointing to already known hash + tester.newPeer("valid", hashes, blocks) + + for _, block := range blocks { + block.ParentHeaderHash = knownHash // Simulate pointing to already known hash } + tester.newPeer("attack", hashes, blocks) + // Try and sync with the malicious node and check that it fails - tester := newTester() - tester.newPeer("attack", hashes, forges) - if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.newPeer("valid", hashes, blocks) - if _, err := tester.syncTake("valid", hashes[0]); err != nil { + if _, err := tester.syncTake("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -537,22 +564,25 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until it's head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { - // Construct a valid chain, but ban one of the hashes in it - hashes := createHashes(0, 8*blockCacheLimit) - hashes[len(hashes)/2+23] = bannedHash // weird index to have non multiple of ban chunk size - - blocks := createBlocksFromHashes(hashes) - // Create the tester and ban the selected hash tester := newTester() tester.downloader.banned.Add(bannedHash) + // Construct a valid chain, for it and ban the fork + hashes := createHashes(8*blockCacheLimit, knownHash) + blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) + + fork := len(hashes)/2 - 23 + hashes = append(createHashes(4*blockCacheLimit, bannedHash), hashes[fork:]...) + blocks = createBlocksFromHashes(hashes) + tester.newPeer("attack", hashes, blocks) + // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", hashes, blocks) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + if _, err := tester.syncTake("attack"); err != errInvalidChain { if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead { break } @@ -569,35 +599,45 @@ func TestBannedChainStarvationAttack(t *testing.T) { if err := tester.newPeer("new attacker", hashes, blocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } - if peer := tester.downloader.peers.Peer("net attacker"); peer != nil { + if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { t.Fatalf("banned attacker registered: %v", peer) } + // Ensure that a valid chain can still pass sync + if _, err := tester.syncTake("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } // Tests that if a peer sends excessively many/large invalid chains that are // gradually banned, it will have an upper limit on the consumed memory and also // the origin bad hashes will not be evacuated. func TestBannedChainMemoryExhaustionAttack(t *testing.T) { - // Reduce the test size a bit - MaxBlockFetch = 4 - maxBannedHashes = 256 - - // Construct a banned chain with more chunks than the ban limit - hashes := createHashes(0, maxBannedHashes*MaxBlockFetch) - hashes[len(hashes)-1] = bannedHash // weird index to have non multiple of ban chunk size - - blocks := createBlocksFromHashes(hashes) - // Create the tester and ban the selected hash tester := newTester() tester.downloader.banned.Add(bannedHash) + // Reduce the test size a bit + defaultMaxBlockFetch := MaxBlockFetch + defaultMaxBannedHashes := maxBannedHashes + + MaxBlockFetch = 4 + maxBannedHashes = 256 + + // Construct a banned chain with more chunks than the ban limit + hashes := createHashes(8*blockCacheLimit, knownHash) + blocks := createBlocksFromHashes(hashes) + tester.newPeer("valid", hashes, blocks) + + fork := len(hashes)/2 - 23 + hashes = append(createHashes(maxBannedHashes*MaxBlockFetch, bannedHash), hashes[fork:]...) + blocks = createBlocksFromHashes(hashes) + tester.newPeer("attack", hashes, blocks) + // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. - tester.newPeer("attack", hashes, blocks) for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + if _, err := tester.syncTake("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned @@ -614,6 +654,13 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { } } } + // Ensure that a valid chain can still pass sync + MaxBlockFetch = defaultMaxBlockFetch + maxBannedHashes = defaultMaxBannedHashes + + if _, err := tester.syncTake("valid"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. From fc7abd98865f3bdc6cc36258026db98a649cd577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 12 Jun 2015 13:35:29 +0300 Subject: [PATCH 07/12] eth, eth/downloader: move block processing into the downlaoder --- eth/downloader/downloader.go | 177 ++++++++++++++------- eth/downloader/downloader_test.go | 256 +++++++++++++++--------------- eth/handler.go | 2 +- eth/sync.go | 53 +------ 4 files changed, 255 insertions(+), 233 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 88ceeb5ac..1bbba11ed 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -3,6 +3,7 @@ package downloader import ( "bytes" "errors" + "math" "math/rand" "sync" "sync/atomic" @@ -28,25 +29,27 @@ var ( crossCheckCycle = time.Second // Period after which to check for expired cross checks maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( - errBusy = errors.New("busy") - errUnknownPeer = errors.New("peer is unknown or unhealthy") - errBadPeer = errors.New("action from bad peer ignored") - errStallingPeer = errors.New("peer is stalling") - errBannedHead = errors.New("peer head hash already banned") - errNoPeers = errors.New("no peers to keep download active") - errPendingQueue = errors.New("pending items in queue") - errTimeout = errors.New("timeout") - errEmptyHashSet = errors.New("empty hash set by peer") - errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") - errAlreadyInPool = errors.New("hash already in pool") - errInvalidChain = errors.New("retrieved hash chain is invalid") - errCrossCheckFailed = errors.New("block cross-check failed") - errCancelHashFetch = errors.New("hash fetching cancelled (requested)") - errCancelBlockFetch = errors.New("block downloading cancelled (requested)") - errNoSyncActive = errors.New("no sync active") + errBusy = errors.New("busy") + errUnknownPeer = errors.New("peer is unknown or unhealthy") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") + errBannedHead = errors.New("peer head hash already banned") + errNoPeers = errors.New("no peers to keep download active") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") + errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") + errAlreadyInPool = errors.New("hash already in pool") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") + errCancelHashFetch = errors.New("hash fetching canceled (requested)") + errCancelBlockFetch = errors.New("block downloading canceled (requested)") + errCancelChainImport = errors.New("chain importing canceled (requested)") + errNoSyncActive = errors.New("no sync active") ) // hashCheckFn is a callback type for verifying a hash's presence in the local chain. @@ -55,6 +58,9 @@ type hashCheckFn func(common.Hash) bool // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block +// chainInsertFn is a callback type to insert a batch of blocks into the local chain. +type chainInsertFn func(types.Blocks) (int, error) + // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) @@ -88,13 +94,15 @@ type Downloader struct { importLock sync.Mutex // Callbacks - hasBlock hashCheckFn // Checks if a block is present in the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - dropPeer peerDropFn // Retrieved the TD of our own chain + hasBlock hashCheckFn // Checks if a block is present in the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Retrieved the TD of our own chain // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchronising int32 + processing int32 notified int32 // Channels @@ -113,18 +121,19 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - hasBlock: hasBlock, - getBlock: getBlock, - dropPeer: dropPeer, - newPeerCh: make(chan *peer, 1), - hashCh: make(chan hashPack, 1), - blockCh: make(chan blockPack, 1), + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + hasBlock: hasBlock, + getBlock: getBlock, + insertChain: insertChain, + dropPeer: dropPeer, + newPeerCh: make(chan *peer, 1), + hashCh: make(chan hashPack, 1), + blockCh: make(chan blockPack, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -157,7 +166,7 @@ func (d *Downloader) Stats() (pending int, cached int, importing int, estimate t return } -// Synchronising returns the state of the downloader +// Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { return atomic.LoadInt32(&d.synchronising) > 0 } @@ -260,19 +269,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the caller. -func (d *Downloader) TakeBlocks() []*Block { - blocks := d.queue.TakeBlocks() - if len(blocks) > 0 { - d.importLock.Lock() - d.importStart = time.Now() - d.importQueue = blocks - d.importDone = 0 - d.importLock.Unlock() - } - return blocks -} - // Has checks if the downloader knows about a particular hash, meaning that its // either already downloaded of pending retrieval. func (d *Downloader) Has(hash common.Hash) bool { @@ -307,19 +303,16 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { // Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) Cancel() bool { - // If we're not syncing just return. - hs, bs := d.queue.Size() - if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { - return false - } +func (d *Downloader) Cancel() { // Close the current cancel channel d.cancelLock.Lock() - select { - case <-d.cancelCh: - // Channel was already closed - default: - close(d.cancelCh) + if d.cancelCh != nil { + select { + case <-d.cancelCh: + // Channel was already closed + default: + close(d.cancelCh) + } } d.cancelLock.Unlock() @@ -330,11 +323,11 @@ func (d *Downloader) Cancel() bool { d.importQueue = nil d.importDone = 0 d.importLock.Unlock() - - return true } -// XXX Make synchronous +// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// up until it finds a common ancestor. If the source peer times out, alternative +// ones are tried for continuation. func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { var ( start = time.Now() @@ -530,10 +523,13 @@ out: glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) break } - // All was successful, promote the peer + // All was successful, promote the peer and potentially start processing peer.Promote() peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + if atomic.LoadInt32(&d.processing) == 0 { + go d.process() + } case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -709,6 +705,71 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { } } +// process takes blocks from the queue and tries to import them into the chain. +func (d *Downloader) process() (err error) { + // Make sure only one goroutine is ever allowed to process blocks at once + if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { + return + } + // If the processor just exited, but there are freshly pending items, try to + // reenter. This is needed because the goroutine spinned up for processing + // the fresh blocks might have been rejected entry to to this present thread + // not yet releasing the `processing` state. + defer func() { + if err == nil && d.queue.GetHeadBlock() != nil { + err = d.process() + } + }() + // Release the lock upon exit (note, before checking for reentry!) + defer atomic.StoreInt32(&d.processing, 0) + + // Fetch the current cancel channel to allow termination + d.cancelLock.RLock() + cancel := d.cancelCh + d.cancelLock.RUnlock() + + // Repeat the processing as long as there are blocks to import + for { + // Fetch the next batch of blocks + blocks := d.queue.TakeBlocks() + if len(blocks) == 0 { + return nil + } + // Reset the import statistics + d.importLock.Lock() + d.importStart = time.Now() + d.importQueue = blocks + d.importDone = 0 + d.importLock.Unlock() + + // Actually import the blocks + glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) + for len(blocks) != 0 { // TODO: quit + // Check for any termination requests + select { + case <-cancel: + return errCancelChainImport + default: + } + // Retrieve the first batch of blocks to insert + max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) + raw := make(types.Blocks, 0, max) + for _, block := range blocks[:max] { + raw = append(raw, block.RawBlock) + } + // Try to inset the blocks, drop the originating peer if there's an error + index, err := d.insertChain(raw) + if err != nil { + glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err) + d.dropPeer(blocks[index].OriginPeer) + d.Cancel() + return errCancelChainImport + } + blocks = blocks[max:] + } + } +} + // DeliverBlocks injects a new batch of blocks received from a remote node. // This is usually invoked through the BlocksMsg by the protocol handler. func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5b85f01fb..6cd141ef7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -2,8 +2,10 @@ package downloader import ( "encoding/binary" + "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -81,7 +83,7 @@ func newTester() *downloadTester { peerBlocks: make(map[string]map[common.Hash]*types.Block), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.dropPeer) + downloader := New(&mux, tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) tester.downloader = downloader return tester @@ -89,44 +91,14 @@ func newTester() *downloadTester { // sync starts synchronizing with a remote peer, blocking until it completes. func (dl *downloadTester) sync(id string) error { - return dl.downloader.synchronise(id, dl.peerHashes[id][0]) + err := dl.downloader.synchronise(id, dl.peerHashes[id][0]) + for atomic.LoadInt32(&dl.downloader.processing) == 1 { + time.Sleep(time.Millisecond) + } + return err } -// syncTake is starts synchronising with a remote peer, but concurrently it also -// starts fetching blocks that the downloader retrieved. IT blocks until both go -// routines terminate. -func (dl *downloadTester) syncTake(id string) ([]*Block, error) { - // Start a block collector to take blocks as they become available - done := make(chan struct{}) - took := []*Block{} - go func() { - for running := true; running; { - select { - case <-done: - running = false - default: - time.Sleep(time.Millisecond) - } - // Take a batch of blocks and accumulate - blocks := dl.downloader.TakeBlocks() - for _, block := range blocks { - dl.ownHashes = append(dl.ownHashes, block.RawBlock.Hash()) - dl.ownBlocks[block.RawBlock.Hash()] = block.RawBlock - } - took = append(took, blocks...) - } - done <- struct{}{} - }() - // Start the downloading, sync the taker and return - err := dl.sync(id) - - done <- struct{}{} - <-done - - return took, err -} - -// hasBlock checks if a block is present in the testers canonical chain. +// hasBlock checks if a block is pres ent in the testers canonical chain. func (dl *downloadTester) hasBlock(hash common.Hash) bool { return dl.getBlock(hash) != nil } @@ -136,6 +108,18 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// insertChain injects a new batch of blocks into the simulated chain. +func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { + for i, block := range blocks { + if _, ok := dl.ownBlocks[block.ParentHash()]; !ok { + return i, errors.New("unknown parent") + } + dl.ownHashes = append(dl.ownHashes, block.Hash()) + dl.ownBlocks[block.Hash()] = block + } + return len(blocks), nil +} + // newPeer registers a new block download source into the downloader. func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) @@ -223,27 +207,8 @@ func TestSynchronisation(t *testing.T) { if err := tester.sync("peer"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks { - t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks) - } -} - -// Tests that the synchronized blocks can be correctly retrieved. -func TestBlockTaking(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 - hashes := createHashes(targetBlocks, knownHash) - blocks := createBlocksFromHashes(hashes) - - tester := newTester() - tester.newPeer("peer", hashes, blocks) - - // Synchronise with the peer and test block retrieval - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks { - t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } @@ -270,20 +235,20 @@ func TestCancel(t *testing.T) { tester := newTester() tester.newPeer("peer", hashes, blocks) - // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) - } - if !tester.downloader.Cancel() { - t.Fatalf("cancel operation failed") - } - // Make sure the queue reports empty and no blocks can be taken + // Make sure canceling works with a pristine downloader + tester.downloader.Cancel() hashCount, blockCount := tester.downloader.queue.Size() if hashCount > 0 || blockCount > 0 { t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) } - if took := tester.downloader.TakeBlocks(); len(took) != 0 { - t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0) + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.Cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) } } @@ -298,29 +263,46 @@ func TestThrottling(t *testing.T) { tester := newTester() tester.newPeer("peer", hashes, blocks) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err + } // Start a synchronisation concurrently errc := make(chan error) go func() { errc <- tester.sync("peer") }() // Iteratively take some blocks, always checking the retrieval count - for total := 0; total < targetBlocks; { - // Wait a bit for sync to complete + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int for start := time.Now(); time.Since(start) < 3*time.Second; { time.Sleep(25 * time.Millisecond) - if len(tester.downloader.queue.blockPool) == blockCacheLimit { + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { break } } - // Fetch the next batch of blocks - took := tester.downloader.TakeBlocks() - if len(took) != blockCacheLimit { - t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit) + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) } - total += len(took) - if total > targetBlocks { - t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks) + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done } + time.Sleep(25 * time.Millisecond) // yield to the insertion + } + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) } if err := <-errc; err != nil { t.Fatalf("block synchronization failed: %v", err) @@ -343,28 +325,18 @@ func TestNonExistingParentAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if err := tester.sync("attack"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + if err := tester.sync("attack"); err == nil { + t.Fatalf("block synchronization succeeded") } - bs := tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) + if tester.hasBlock(hashes[0]) { + t.Fatalf("tester accepted unknown-parent block: %v", blocks[hashes[0]]) } - if tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester knows about the unknown hash") - } - tester.downloader.Cancel() - // Try to synchronize with the valid chain and make sure it succeeds if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - bs = tester.downloader.TakeBlocks() - if len(bs) != 1 { - t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) - } - if !tester.hasBlock(bs[0].RawBlock.ParentHash()) { - t.Fatalf("tester doesn't know about the origin hash") + if !tester.hasBlock(tester.peerHashes["valid"][0]) { + t.Fatalf("tester didn't accept known-parent block: %v", tester.peerBlocks["valid"][hashes[0]]) } } @@ -442,11 +414,11 @@ func TestInvalidHashOrderAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -466,11 +438,11 @@ func TestMadeupHashChainAttack(t *testing.T) { tester.newPeer("attack", createHashes(1024*blockCacheLimit, knownHash), nil) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -487,7 +459,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 tester.newPeer("attack", hashes, nil) - if _, err := tester.syncTake("attack"); err != errStallingPeer { + if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -512,7 +484,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester() tester.newPeer("attack", gapped, blocks) - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync @@ -520,7 +492,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { crossCheckCycle = defaultCrossCheckCycle tester.newPeer("valid", hashes, blocks) - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -548,14 +520,14 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { tester.newPeer("attack", hashes, blocks) // Try and sync with the malicious node and check that it fails - if _, err := tester.syncTake("attack"); err != errCrossCheckFailed { + if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -582,7 +554,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { // the head of the invalid chain is blocked too. for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { if tester.downloader.banned.Has(hashes[0]) && err == errBannedHead { break } @@ -603,7 +575,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { t.Fatalf("banned attacker registered: %v", peer) } // Ensure that a valid chain can still pass sync - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } @@ -637,7 +609,7 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { // the head of the invalid chain is blocked too. for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack"); err != errInvalidChain { + if err := tester.sync("attack"); err != errInvalidChain { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned @@ -658,33 +630,34 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = defaultMaxBlockFetch maxBannedHashes = defaultMaxBannedHashes - if _, err := tester.syncTake("valid"); err != nil { + if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. -func TestAttackerDropping(t *testing.T) { - // Define the disconnection requirement for individual errors +func TestHashAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual hash fetch errors tests := []struct { result error drop bool }{ - {nil, false}, // Sync succeeded, all is well - {errBusy, false}, // Sync is already in progress, no problem - {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop - {errBadPeer, true}, // Peer was deemed bad for some reason, drop it - {errStallingPeer, true}, // Peer was detected to be stalling, drop it - {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it - {errNoPeers, false}, // No peers to download from, soft race, no issue - {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue - {errTimeout, true}, // No hashes received in due time, drop the peer - {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end - {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser - {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop - {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop - {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop - {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {nil, false}, // Sync succeeded, all is well + {errBusy, false}, // Sync is already in progress, no problem + {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop + {errBadPeer, true}, // Peer was deemed bad for some reason, drop it + {errStallingPeer, true}, // Peer was detected to be stalling, drop it + {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it + {errNoPeers, false}, // No peers to download from, soft race, no issue + {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue + {errTimeout, true}, // No hashes received in due time, drop the peer + {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end + {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser + {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop + {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop + {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop + {errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop } // Run the tests and check disconnection status tester := newTester() @@ -706,3 +679,38 @@ func TestAttackerDropping(t *testing.T) { } } } + +// Tests that feeding bad blocks will result in a peer drop. +func TestBlockAttackerDropping(t *testing.T) { + // Define the disconnection requirement for individual block import errors + tests := []struct { + failure bool + drop bool + }{{true, true}, {false, false}} + + // Run the tests and check disconnection status + tester := newTester() + for i, tt := range tests { + // Register a new peer and ensure it's presence + id := fmt.Sprintf("test %d", i) + if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + t.Fatalf("test %d: failed to register new peer: %v", i, err) + } + if _, ok := tester.peerHashes[id]; !ok { + t.Fatalf("test %d: registered peer not found", i) + } + // Assemble a good or bad block, depending of the test + raw := createBlock(1, knownHash, common.Hash{}) + if tt.failure { + raw = createBlock(1, unknownHash, common.Hash{}) + } + block := &Block{OriginPeer: id, RawBlock: raw} + + // Simulate block processing and check the result + tester.downloader.queue.blockCache[0] = block + tester.downloader.process() + if _, ok := tester.peerHashes[id]; !ok != tt.drop { + t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop) + } + } +} diff --git a/eth/handler.go b/eth/handler.go index ac7fb8fcf..ec4f2d53a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -80,7 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer) + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), diff --git a/eth/sync.go b/eth/sync.go index b127ca979..88a76805c 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -1,9 +1,7 @@ package eth import ( - "math" "math/rand" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -15,12 +13,10 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - blockProcCycle = 500 * time.Millisecond // Time interval to check for new blocks to process notifyCheckCycle = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested notifyFetchTimeout = 5 * time.Second // Maximum alloted time to return an explicitly requested block minDesiredPeerCount = 5 // Amount of peers desired to start syncing - blockProcAmount = 256 // This is the target size for the packs of transactions sent by txsyncLoop. // A pack can get larger than this if a single transactions exceeds this size. @@ -254,10 +250,10 @@ func (pm *ProtocolManager) fetcher() { // syncer is responsible for periodically synchronising with the network, both // downloading hashes and blocks as well as retrieving cached ones. func (pm *ProtocolManager) syncer() { - forceSync := time.Tick(forceSyncCycle) - blockProc := time.Tick(blockProcCycle) - blockProcPend := int32(0) + // Abort any pending syncs if we terminate + defer pm.downloader.Cancel() + forceSync := time.Tick(forceSyncCycle) for { select { case <-pm.newPeerCh: @@ -271,55 +267,12 @@ func (pm *ProtocolManager) syncer() { // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) - case <-blockProc: - // Try to pull some blocks from the downloaded - if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { - go func() { - pm.processBlocks() - atomic.StoreInt32(&blockProcPend, 0) - }() - } - case <-pm.quitSync: return } } } -// processBlocks retrieves downloaded blocks from the download cache and tries -// to construct the local block chain with it. Note, since the block retrieval -// order matters, access to this function *must* be synchronized/serialized. -func (pm *ProtocolManager) processBlocks() error { - pm.wg.Add(1) - defer pm.wg.Done() - - // Short circuit if no blocks are available for insertion - blocks := pm.downloader.TakeBlocks() - if len(blocks) == 0 { - return nil - } - glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) - - for len(blocks) != 0 && !pm.quit { - // Retrieve the first batch of blocks to insert - max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) - raw := make(types.Blocks, 0, max) - for _, block := range blocks[:max] { - raw = append(raw, block.RawBlock) - } - // Try to inset the blocks, drop the originating peer if there's an error - index, err := pm.chainman.InsertChain(raw) - if err != nil { - glog.V(logger.Debug).Infoln("Downloaded block import failed:", err) - pm.removePeer(blocks[index].OriginPeer) - pm.downloader.Cancel() - return err - } - blocks = blocks[max:] - } - return nil -} - // synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. func (pm *ProtocolManager) synchronise(peer *peer) { From 30a9939388ac738aba39eb64c287bbf9bbda91c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 12 Jun 2015 14:36:44 +0300 Subject: [PATCH 08/12] eth/downloader: sanity test for multi peer syncs --- eth/downloader/downloader_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6cd141ef7..9803ae534 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -309,6 +309,37 @@ func TestThrottling(t *testing.T) { } } +// Tests that synchronisation from multiple peers works as intended (multi thread sanity test). +func TestMultiSynchronisation(t *testing.T) { + // Create various peers with various parts of the chain + targetPeers := 16 + targetBlocks := targetPeers*blockCacheLimit - 15 + + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester := newTester() + for i := 0; i < targetPeers; i++ { + id := fmt.Sprintf("peer #%d", i) + tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + } + // Synchronise with the middle peer and make sure half of the blocks were retrieved + id := fmt.Sprintf("peer #%d", targetPeers/2) + if err := tester.sync(id); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id])) + } + // Synchronise with the best peer and make sure everything is retrieved + if err := tester.sync("peer #0"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } +} + // Tests that if a peer returns an invalid chain with a block pointing to a non- // existing parent, it is correctly detected and handled. func TestNonExistingParentAttack(t *testing.T) { From b240983e2bafcde1c5902ce3a196b22475412f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 15 Jun 2015 12:26:05 +0300 Subject: [PATCH 09/12] eth, eth/downloader: do async block fetches, add dl tests --- eth/downloader/downloader_test.go | 48 +++++++++++++++++++++++++++++-- eth/downloader/peer.go | 2 +- eth/sync.go | 2 +- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 9803ae534..f71c16237 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -122,7 +122,14 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { // newPeer registers a new block download source into the downloader. func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) + return dl.newSlowPeer(id, hashes, blocks, 0) +} + +// newSlowPeer registers a new block download source into the downloader, with a +// specific delay time on processing the network packets sent to it, simulating +// potentially slow network IO. +func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -147,8 +154,10 @@ func (dl *downloadTester) dropPeer(id string) { // peerGetBlocksFn constructs a getHashes function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error { +func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { return func(head common.Hash) error { + time.Sleep(delay) + limit := MaxHashFetch if dl.maxHashFetch > 0 { limit = dl.maxHashFetch @@ -178,8 +187,10 @@ func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) erro // peerGetBlocksFn constructs a getBlocks function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of blocks from the particularly requested peer. -func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error { +func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { + time.Sleep(delay) + blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -340,6 +351,37 @@ func TestMultiSynchronisation(t *testing.T) { } } +// Tests that synchronising with a peer who's very slow at network IO does not +// stall the other peers in the system. +func TestSlowSynchronisation(t *testing.T) { + tester := newTester() + + // Create a batch of blocks, with a slow and a full speed peer + targetCycles := 2 + targetBlocks := targetCycles*blockCacheLimit - 15 + targetIODelay := 500 * time.Millisecond + + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester.newSlowPeer("fast", hashes, blocks, 0) + tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + + // Try to sync with the peers (pull hashes from fast) + start := time.Now() + if err := tester.sync("fast"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } + // Check that the slow peer got hit at most once per block-cache-size import + limit := time.Duration(targetCycles+1) * targetIODelay + if delay := time.Since(start); delay >= limit { + t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit) + } +} + // Tests that if a peer returns an invalid chain with a block pointing to a non- // existing parent, it is correctly detected and handled. func TestNonExistingParentAttack(t *testing.T) { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 9614a6951..f36e133e4 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -74,7 +74,7 @@ func (p *peer) Fetch(request *fetchRequest) error { for hash, _ := range request.Hashes { hashes = append(hashes, hash) } - p.getBlocks(hashes) + go p.getBlocks(hashes) return nil } diff --git a/eth/sync.go b/eth/sync.go index 88a76805c..917fc0fce 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -171,7 +171,7 @@ func (pm *ProtocolManager) fetcher() { // Send out all block requests for peer, hashes := range request { glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) - peer.requestBlocks(hashes) + go peer.requestBlocks(hashes) } request = make(map[*peer][]common.Hash) From 9c03c374e32541c905adb9a3b8783cd721117030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 15 Jun 2015 13:05:01 +0300 Subject: [PATCH 10/12] eth/downloader: fix import statistic reset, fetch hashes async --- eth/downloader/downloader.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1bbba11ed..7f8ef12ee 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -316,13 +316,8 @@ func (d *Downloader) Cancel() { } d.cancelLock.Unlock() - // Reset the queue and import statistics + // Reset the queue d.queue.Reset() - - d.importLock.Lock() - d.importQueue = nil - d.importDone = 0 - d.importLock.Unlock() } // fetchHahes starts retrieving hashes backwards from a specific peer and hash, @@ -345,7 +340,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { <-timeout.C // timeout channel should be initially empty. getHashes := func(from common.Hash) { - active.getHashes(from) + go active.getHashes(from) timeout.Reset(hashTTL) } @@ -414,9 +409,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { expire: time.Now().Add(blockSoftTTL), parent: parent, } - active.getBlocks([]common.Hash{origin}) + go active.getBlocks([]common.Hash{origin}) - // Also fetch a fresh + // Also fetch a fresh batch of hashes getHashes(head) continue } @@ -720,8 +715,16 @@ func (d *Downloader) process() (err error) { err = d.process() } }() - // Release the lock upon exit (note, before checking for reentry!) - defer atomic.StoreInt32(&d.processing, 0) + // Release the lock upon exit (note, before checking for reentry!), and set + // the import statistics to zero. + defer func() { + d.importLock.Lock() + d.importQueue = nil + d.importDone = 0 + d.importLock.Unlock() + + atomic.StoreInt32(&d.processing, 0) + }() // Fetch the current cancel channel to allow termination d.cancelLock.RLock() From cf7c44a7f6e5422148e0be98186d5570ce3e0ac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 15 Jun 2015 15:18:04 +0300 Subject: [PATCH 11/12] eth/downloader: detailed comment for the race corner case --- eth/downloader/downloader.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 7f8ef12ee..306c4fd2d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -522,9 +522,7 @@ out: peer.Promote() peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - if atomic.LoadInt32(&d.processing) == 0 { - go d.process() - } + go d.process() case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort @@ -701,6 +699,19 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { } // process takes blocks from the queue and tries to import them into the chain. +// +// The algorithmic flow is as follows: +// - The `processing` flag is swapped to 1 to ensure singleton access +// - The current `cancel` channel is retrieved to detect sync abortions +// - Blocks are iteratively taken from the cache and inserted into the chain +// - When the cache becomes empty, insertion stops +// - The `processing` flag is swapped back to 0 +// - A post-exit check is made whether new blocks became available +// - This step is important: it handles a potential race condition between +// checking for no more work, and releasing the processing "mutex". In +// between these state changes, a block may have arrived, but a processing +// attempt denied, so we need to re-enter to ensure the block isn't left +// to idle in the cache. func (d *Downloader) process() (err error) { // Make sure only one goroutine is ever allowed to process blocks at once if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { @@ -763,7 +774,7 @@ func (d *Downloader) process() (err error) { // Try to inset the blocks, drop the originating peer if there's an error index, err := d.insertChain(raw) if err != nil { - glog.V(logger.Debug).Infoln("Block #%d import failed:", raw[index].NumberU64(), err) + glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) d.dropPeer(blocks[index].OriginPeer) d.Cancel() return errCancelChainImport From aa250e228a7f2eec5d512d05eb042b75e2755d30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 15 Jun 2015 15:18:31 +0300 Subject: [PATCH 12/12] eth: don't refetch non fitting blocks to avoid duplicates --- eth/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/sync.go b/eth/sync.go index 917fc0fce..a3b177a4d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -214,7 +214,7 @@ func (pm *ProtocolManager) fetcher() { if announce := pending[hash]; announce != nil { // Drop the block if it surely cannot fit if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) { - delete(pending, hash) + // delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout? continue } // Otherwise accumulate for import