Merge pull request #1866 from karalabe/honor-eth-capabilities
eth/downloader: match capabilities when querying idle peers
This commit is contained in:
		
						commit
						8b865fa9bf
					
				| @ -816,7 +816,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { | ||||
| 			} | ||||
| 			// Send a download request to all idle peers, until throttled
 | ||||
| 			throttled := false | ||||
| 			for _, peer := range d.peers.IdlePeers() { | ||||
| 			for _, peer := range d.peers.IdlePeers(eth61) { | ||||
| 				// Short circuit if throttling activated
 | ||||
| 				if d.queue.Throttle() { | ||||
| 					throttled = true | ||||
| @ -1255,7 +1255,7 @@ func (d *Downloader) fetchBodies(from uint64) error { | ||||
| 			} | ||||
| 			// Send a download request to all idle peers, until throttled
 | ||||
| 			queuedEmptyBlocks, throttled := false, false | ||||
| 			for _, peer := range d.peers.IdlePeers() { | ||||
| 			for _, peer := range d.peers.IdlePeers(eth62) { | ||||
| 				// Short circuit if throttling activated
 | ||||
| 				if d.queue.Throttle() { | ||||
| 					throttled = true | ||||
|  | ||||
| @ -205,9 +205,17 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha | ||||
| 	dl.lock.Lock() | ||||
| 	defer dl.lock.Unlock() | ||||
| 
 | ||||
| 	err := dl.downloader.RegisterPeer(id, version, hashes[0], | ||||
| 		dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), | ||||
| 		dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) | ||||
| 	var err error | ||||
| 	switch version { | ||||
| 	case 61: | ||||
| 		err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil) | ||||
| 	case 62: | ||||
| 		err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) | ||||
| 	case 63: | ||||
| 		err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) | ||||
| 	case 64: | ||||
| 		err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) | ||||
| 	} | ||||
| 	if err == nil { | ||||
| 		// Assign the owned hashes and blocks to the peer (deep copy)
 | ||||
| 		dl.peerHashes[id] = make([]common.Hash, len(hashes)) | ||||
| @ -618,6 +626,41 @@ func testMultiSynchronisation(t *testing.T, protocol int) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Tests that synchronisations behave well in multi-version protocol environments
 | ||||
| // and not wreak havok on other nodes in the network.
 | ||||
| func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) } | ||||
| func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) } | ||||
| func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) } | ||||
| func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) } | ||||
| 
 | ||||
| func testMultiProtocolSynchronisation(t *testing.T, protocol int) { | ||||
| 	// Create a small enough block chain to download
 | ||||
| 	targetBlocks := blockCacheLimit - 15 | ||||
| 	hashes, blocks := makeChain(targetBlocks, 0, genesis) | ||||
| 
 | ||||
| 	// Create peers of every type
 | ||||
| 	tester := newTester() | ||||
| 	tester.newPeer("peer 61", 61, hashes, blocks) | ||||
| 	tester.newPeer("peer 62", 62, hashes, blocks) | ||||
| 	tester.newPeer("peer 63", 63, hashes, blocks) | ||||
| 	tester.newPeer("peer 64", 64, hashes, blocks) | ||||
| 
 | ||||
| 	// Synchronise with the requestd peer and make sure all blocks were retrieved
 | ||||
| 	if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil { | ||||
| 		t.Fatalf("failed to synchronise blocks: %v", err) | ||||
| 	} | ||||
| 	if imported := len(tester.ownBlocks); imported != targetBlocks+1 { | ||||
| 		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) | ||||
| 	} | ||||
| 	// Check that no peers have been dropped off
 | ||||
| 	for _, version := range []int{61, 62, 63, 64} { | ||||
| 		peer := fmt.Sprintf("peer %d", version) | ||||
| 		if _, ok := tester.peerHashes[peer]; !ok { | ||||
| 			t.Errorf("%s dropped", peer) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Tests that if a block is empty (i.e. header only), no body request should be
 | ||||
| // made, and instead the header should be assembled into a whole block in itself.
 | ||||
| func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } | ||||
|  | ||||
| @ -312,14 +312,16 @@ func (ps *peerSet) AllPeers() []*peer { | ||||
| 
 | ||||
| // IdlePeers retrieves a flat list of all the currently idle peers within the
 | ||||
| // active peer set, ordered by their reputation.
 | ||||
| func (ps *peerSet) IdlePeers() []*peer { | ||||
| func (ps *peerSet) IdlePeers(version int) []*peer { | ||||
| 	ps.lock.RLock() | ||||
| 	defer ps.lock.RUnlock() | ||||
| 
 | ||||
| 	list := make([]*peer, 0, len(ps.peers)) | ||||
| 	for _, p := range ps.peers { | ||||
| 		if atomic.LoadInt32(&p.idle) == 0 { | ||||
| 			list = append(list, p) | ||||
| 		if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { | ||||
| 			if atomic.LoadInt32(&p.idle) == 0 { | ||||
| 				list = append(list, p) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	for i := 0; i < len(list); i++ { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user