eth/downloader: fix #910, thread safe peers & polishes
This commit is contained in:
		
							parent
							
								
									fe7e284709
								
							
						
					
					
						commit
						70c65835f4
					
				| @ -49,12 +49,6 @@ type blockPack struct { | |||||||
| 	blocks []*types.Block | 	blocks []*types.Block | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type syncPack struct { |  | ||||||
| 	peer          *peer |  | ||||||
| 	hash          common.Hash |  | ||||||
| 	ignoreInitial bool |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type hashPack struct { | type hashPack struct { | ||||||
| 	peerId string | 	peerId string | ||||||
| 	hashes []common.Hash | 	hashes []common.Hash | ||||||
| @ -63,7 +57,7 @@ type hashPack struct { | |||||||
| type Downloader struct { | type Downloader struct { | ||||||
| 	mu         sync.RWMutex | 	mu         sync.RWMutex | ||||||
| 	queue      *queue | 	queue      *queue | ||||||
| 	peers      peers | 	peers      *peerSet | ||||||
| 	activePeer string | 	activePeer string | ||||||
| 
 | 
 | ||||||
| 	// Callbacks
 | 	// Callbacks
 | ||||||
| @ -83,7 +77,7 @@ type Downloader struct { | |||||||
| func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { | func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { | ||||||
| 	downloader := &Downloader{ | 	downloader := &Downloader{ | ||||||
| 		queue:     newQueue(), | 		queue:     newQueue(), | ||||||
| 		peers:     make(peers), | 		peers:     newPeerSet(), | ||||||
| 		hasBlock:  hasBlock, | 		hasBlock:  hasBlock, | ||||||
| 		getBlock:  getBlock, | 		getBlock:  getBlock, | ||||||
| 		newPeerCh: make(chan *peer, 1), | 		newPeerCh: make(chan *peer, 1), | ||||||
| @ -98,29 +92,26 @@ func (d *Downloader) Stats() (current int, max int) { | |||||||
| 	return d.queue.Size() | 	return d.queue.Size() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (d *Downloader) RegisterPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { | // RegisterPeer injects a new download peer into the set of block source to be
 | ||||||
| 	d.mu.Lock() | // used for fetching hashes and blocks from.
 | ||||||
| 	defer d.mu.Unlock() | func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { | ||||||
| 
 | 	glog.V(logger.Detail).Infoln("Registering peer", id) | ||||||
| 	glog.V(logger.Detail).Infoln("Register peer", id) | 	if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { | ||||||
| 
 | 		glog.V(logger.Error).Infoln("Register failed:", err) | ||||||
| 	// Create a new peer and add it to the list of known peers
 | 		return err | ||||||
| 	peer := newPeer(id, hash, getHashes, getBlocks) | 	} | ||||||
| 	// add peer to our peer set
 |  | ||||||
| 	d.peers[id] = peer |  | ||||||
| 	// broadcast new peer
 |  | ||||||
| 
 |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // UnregisterPeer unregisters a peer. This will prevent any action from the specified peer.
 | // UnregisterPeer remove a peer from the known list, preventing any action from
 | ||||||
| func (d *Downloader) UnregisterPeer(id string) { | // the specified peer.
 | ||||||
| 	d.mu.Lock() | func (d *Downloader) UnregisterPeer(id string) error { | ||||||
| 	defer d.mu.Unlock() | 	glog.V(logger.Detail).Infoln("Unregistering peer", id) | ||||||
| 
 | 	if err := d.peers.Unregister(id); err != nil { | ||||||
| 	glog.V(logger.Detail).Infoln("Unregister peer", id) | 		glog.V(logger.Error).Infoln("Unregister failed:", err) | ||||||
| 
 | 		return err | ||||||
| 	delete(d.peers, id) | 	} | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Synchronise will select the peer and use it for synchronising. If an empty string is given
 | // Synchronise will select the peer and use it for synchronising. If an empty string is given
 | ||||||
| @ -140,15 +131,16 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { | |||||||
| 	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { | 	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { | ||||||
| 		return errPendingQueue | 		return errPendingQueue | ||||||
| 	} | 	} | ||||||
| 	// Reset the queue to clean any internal leftover state
 | 	// Reset the queue and peer set to clean any internal leftover state
 | ||||||
| 	d.queue.Reset() | 	d.queue.Reset() | ||||||
|  | 	d.peers.Reset() | ||||||
| 
 | 
 | ||||||
| 	// Retrieve the origin peer and initiate the downloading process
 | 	// Retrieve the origin peer and initiate the downloading process
 | ||||||
| 	p := d.peers[id] | 	p := d.peers.Peer(id) | ||||||
| 	if p == nil { | 	if p == nil { | ||||||
| 		return errUnknownPeer | 		return errUnknownPeer | ||||||
| 	} | 	} | ||||||
| 	return d.getFromPeer(p, hash, false) | 	return d.syncWithPeer(p, hash) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
 | // TakeBlocks takes blocks from the queue and yields them to the blockTaker handler
 | ||||||
| @ -167,7 +159,9 @@ func (d *Downloader) Has(hash common.Hash) bool { | |||||||
| 	return d.queue.Has(hash) | 	return d.queue.Has(hash) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { | // syncWithPeer starts a block synchronization based on the hash chain from the
 | ||||||
|  | // specified peer and head hash.
 | ||||||
|  | func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { | ||||||
| 	d.activePeer = p.id | 	d.activePeer = p.id | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		// reset on error
 | 		// reset on error
 | ||||||
| @ -177,21 +171,12 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) | |||||||
| 	}() | 	}() | ||||||
| 
 | 
 | ||||||
| 	glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) | 	glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) | ||||||
| 	// Start the fetcher. This will block the update entirely
 | 	if err = d.fetchHashes(p, hash); err != nil { | ||||||
| 	// interupts need to be send to the appropriate channels
 |  | ||||||
| 	// respectively.
 |  | ||||||
| 	if err = d.startFetchingHashes(p, hash, ignoreInitial); err != nil { |  | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 | 	if err = d.fetchBlocks(); err != nil { | ||||||
| 	// Start fetching blocks in paralel. The strategy is simple
 |  | ||||||
| 	// take any available peers, seserve a chunk for each peer available,
 |  | ||||||
| 	// let the peer deliver the chunkn and periodically check if a peer
 |  | ||||||
| 	// has timedout.
 |  | ||||||
| 	if err = d.startFetchingBlocks(p); err != nil { |  | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	glog.V(logger.Debug).Infoln("Synchronization completed") | 	glog.V(logger.Debug).Infoln("Synchronization completed") | ||||||
| 
 | 
 | ||||||
| 	return nil | 	return nil | ||||||
| @ -234,17 +219,14 @@ blockDone: | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // XXX Make synchronous
 | // XXX Make synchronous
 | ||||||
| func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { | func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { | ||||||
| 	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) | 	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) | ||||||
| 
 | 
 | ||||||
| 	start := time.Now() | 	start := time.Now() | ||||||
| 
 | 
 | ||||||
| 	// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
 | 	// Add the hash to the queue first
 | ||||||
| 	// In such circumstances we don't need to download the block so don't add it to the queue.
 | 	d.queue.Insert([]common.Hash{h}) | ||||||
| 	if !ignoreInitial { | 
 | ||||||
| 		// Add the hash to the queue first
 |  | ||||||
| 		d.queue.Insert([]common.Hash{h}) |  | ||||||
| 	} |  | ||||||
| 	// Get the first batch of hashes
 | 	// Get the first batch of hashes
 | ||||||
| 	p.getHashes(h) | 	p.getHashes(h) | ||||||
| 
 | 
 | ||||||
| @ -308,20 +290,18 @@ out: | |||||||
| 			// Attempt to find a new peer by checking inclusion of peers best hash in our
 | 			// Attempt to find a new peer by checking inclusion of peers best hash in our
 | ||||||
| 			// already fetched hash list. This can't guarantee 100% correctness but does
 | 			// already fetched hash list. This can't guarantee 100% correctness but does
 | ||||||
| 			// a fair job. This is always either correct or false incorrect.
 | 			// a fair job. This is always either correct or false incorrect.
 | ||||||
| 			for id, peer := range d.peers { | 			for _, peer := range d.peers.AllPeers() { | ||||||
| 				if d.queue.Has(peer.recentHash) && !attemptedPeers[id] { | 				if d.queue.Has(peer.head) && !attemptedPeers[p.id] { | ||||||
| 					p = peer | 					p = peer | ||||||
| 					break | 					break | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 
 |  | ||||||
| 			// if all peers have been tried, abort the process entirely or if the hash is
 | 			// if all peers have been tried, abort the process entirely or if the hash is
 | ||||||
| 			// the zero hash.
 | 			// the zero hash.
 | ||||||
| 			if p == nil || (hash == common.Hash{}) { | 			if p == nil || (hash == common.Hash{}) { | ||||||
| 				d.queue.Reset() | 				d.queue.Reset() | ||||||
| 				return ErrTimeout | 				return ErrTimeout | ||||||
| 			} | 			} | ||||||
| 
 |  | ||||||
| 			// set p to the active peer. this will invalidate any hashes that may be returned
 | 			// set p to the active peer. this will invalidate any hashes that may be returned
 | ||||||
| 			// by our previous (delayed) peer.
 | 			// by our previous (delayed) peer.
 | ||||||
| 			activePeer = p | 			activePeer = p | ||||||
| @ -334,14 +314,11 @@ out: | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (d *Downloader) startFetchingBlocks(p *peer) error { | // fetchBlocks iteratively downloads the entire schedules block-chain, taking
 | ||||||
|  | // any available peers, reserving a chunk of blocks for each, wait for delivery
 | ||||||
|  | // and periodically checking for timeouts.
 | ||||||
|  | func (d *Downloader) fetchBlocks() error { | ||||||
| 	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") | 	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") | ||||||
| 
 |  | ||||||
| 	// Defer the peer reset. This will empty the peer requested set
 |  | ||||||
| 	// and makes sure there are no lingering peers with an incorrect
 |  | ||||||
| 	// state
 |  | ||||||
| 	defer d.peers.reset() |  | ||||||
| 
 |  | ||||||
| 	start := time.Now() | 	start := time.Now() | ||||||
| 
 | 
 | ||||||
| 	// default ticker for re-fetching blocks every now and then
 | 	// default ticker for re-fetching blocks every now and then
 | ||||||
| @ -354,19 +331,19 @@ out: | |||||||
| 		case blockPack := <-d.blockCh: | 		case blockPack := <-d.blockCh: | ||||||
| 			// If the peer was previously banned and failed to deliver it's pack
 | 			// If the peer was previously banned and failed to deliver it's pack
 | ||||||
| 			// in a reasonable time frame, ignore it's message.
 | 			// in a reasonable time frame, ignore it's message.
 | ||||||
| 			if d.peers[blockPack.peerId] != nil { | 			if peer := d.peers.Peer(blockPack.peerId); peer != nil { | ||||||
| 				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) | 				// Deliver the received chunk of blocks, but drop the peer if invalid
 | ||||||
| 				if err != nil { | 				if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { | ||||||
| 					glog.V(logger.Debug).Infof("deliver failed for peer %s: %v\n", blockPack.peerId, err) | 					glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err) | ||||||
| 					// FIXME d.UnregisterPeer(blockPack.peerId)
 | 					d.peers.Unregister(blockPack.peerId) | ||||||
| 					break | 					break | ||||||
| 				} | 				} | ||||||
| 
 |  | ||||||
| 				if glog.V(logger.Debug) { | 				if glog.V(logger.Debug) { | ||||||
| 					glog.Infof("adding %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) | 					glog.Infof("Added %d blocks from: %s\n", len(blockPack.blocks), blockPack.peerId) | ||||||
| 				} | 				} | ||||||
| 				d.peers[blockPack.peerId].promote() | 				// Promote the peer and update it's idle state
 | ||||||
| 				d.peers.setState(blockPack.peerId, idleState) | 				peer.Promote() | ||||||
|  | 				peer.SetIdle() | ||||||
| 			} | 			} | ||||||
| 		case <-ticker.C: | 		case <-ticker.C: | ||||||
| 			// Check for bad peers. Bad peers may indicate a peer not responding
 | 			// Check for bad peers. Bad peers may indicate a peer not responding
 | ||||||
| @ -381,13 +358,10 @@ out: | |||||||
| 				// 1) Time for them to respond;
 | 				// 1) Time for them to respond;
 | ||||||
| 				// 2) Measure their speed;
 | 				// 2) Measure their speed;
 | ||||||
| 				// 3) Amount and availability.
 | 				// 3) Amount and availability.
 | ||||||
| 				if peer := d.peers[pid]; peer != nil { | 				d.peers.Unregister(pid) | ||||||
| 					peer.demote() |  | ||||||
| 					peer.reset() |  | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
| 			// After removing bad peers make sure we actually have sufficient peer left to keep downloading
 | 			// After removing bad peers make sure we actually have sufficient peer left to keep downloading
 | ||||||
| 			if len(d.peers) == 0 { | 			if d.peers.Peers() == 0 { | ||||||
| 				d.queue.Reset() | 				d.queue.Reset() | ||||||
| 				return errNoPeers | 				return errNoPeers | ||||||
| 			} | 			} | ||||||
| @ -398,31 +372,29 @@ out: | |||||||
| 				if d.queue.Throttle() { | 				if d.queue.Throttle() { | ||||||
| 					continue | 					continue | ||||||
| 				} | 				} | ||||||
| 
 | 				// Send a download request to all idle peers
 | ||||||
| 				availablePeers := d.peers.get(idleState) | 				idlePeers := d.peers.IdlePeers() | ||||||
| 				for _, peer := range availablePeers { | 				for _, peer := range idlePeers { | ||||||
| 					// Get a possible chunk. If nil is returned no chunk
 | 					// Get a possible chunk. If nil is returned no chunk
 | ||||||
| 					// could be returned due to no hashes available.
 | 					// could be returned due to no hashes available.
 | ||||||
| 					request := d.queue.Reserve(peer, maxBlockFetch) | 					request := d.queue.Reserve(peer, maxBlockFetch) | ||||||
| 					if request == nil { | 					if request == nil { | ||||||
| 						continue | 						continue | ||||||
| 					} | 					} | ||||||
| 					// XXX make fetch blocking.
 |  | ||||||
| 					// Fetch the chunk and check for error. If the peer was somehow
 | 					// Fetch the chunk and check for error. If the peer was somehow
 | ||||||
| 					// already fetching a chunk due to a bug, it will be returned to
 | 					// already fetching a chunk due to a bug, it will be returned to
 | ||||||
| 					// the queue
 | 					// the queue
 | ||||||
| 					if err := peer.fetch(request); err != nil { | 					if err := peer.Fetch(request); err != nil { | ||||||
| 						// log for tracing
 | 						glog.V(logger.Error).Infof("Peer %s received double work\n", peer.id) | ||||||
| 						glog.V(logger.Debug).Infof("peer %s received double work (state = %v)\n", peer.id, peer.state) |  | ||||||
| 						d.queue.Cancel(request) | 						d.queue.Cancel(request) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
| 				// make sure that we have peers available for fetching. If all peers have been tried
 | 				// Make sure that we have peers available for fetching. If all peers have been tried
 | ||||||
| 				// and all failed throw an error
 | 				// and all failed throw an error
 | ||||||
| 				if d.queue.InFlight() == 0 { | 				if d.queue.InFlight() == 0 { | ||||||
| 					d.queue.Reset() | 					d.queue.Reset() | ||||||
| 
 | 
 | ||||||
| 					return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.Pending()) | 					return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Peers(), d.queue.Pending()) | ||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 			} else if d.queue.InFlight() == 0 { | 			} else if d.queue.InFlight() == 0 { | ||||||
|  | |||||||
| @ -229,7 +229,7 @@ func TestThrottling(t *testing.T) { | |||||||
| 	minDesiredPeerCount = 4 | 	minDesiredPeerCount = 4 | ||||||
| 	blockTtl = 1 * time.Second | 	blockTtl = 1 * time.Second | ||||||
| 
 | 
 | ||||||
| 	targetBlocks := 4 * blockCacheLimit | 	targetBlocks := 16 * blockCacheLimit | ||||||
| 	hashes := createHashes(0, targetBlocks) | 	hashes := createHashes(0, targetBlocks) | ||||||
| 	blocks := createBlocksFromHashes(hashes) | 	blocks := createBlocksFromHashes(hashes) | ||||||
| 	tester := newTester(t, hashes, blocks) | 	tester := newTester(t, hashes, blocks) | ||||||
| @ -256,6 +256,7 @@ func TestThrottling(t *testing.T) { | |||||||
| 				return | 				return | ||||||
| 			default: | 			default: | ||||||
| 				took = append(took, tester.downloader.TakeBlocks()...) | 				took = append(took, tester.downloader.TakeBlocks()...) | ||||||
|  | 				time.Sleep(time.Millisecond) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  | |||||||
| @ -1,63 +1,35 @@ | |||||||
|  | // Contains the active peer-set of the downloader, maintaining both failures
 | ||||||
|  | // as well as reputation metrics to prioritize the block retrievals.
 | ||||||
|  | 
 | ||||||
| package downloader | package downloader | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"sync/atomic" | ||||||
| 
 | 
 | ||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
| 	"gopkg.in/fatih/set.v0" | 	"gopkg.in/fatih/set.v0" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( |  | ||||||
| 	workingState = 2 |  | ||||||
| 	idleState    = 4 |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type hashFetcherFn func(common.Hash) error | type hashFetcherFn func(common.Hash) error | ||||||
| type blockFetcherFn func([]common.Hash) error | type blockFetcherFn func([]common.Hash) error | ||||||
| 
 | 
 | ||||||
| // XXX make threadsafe!!!!
 | var ( | ||||||
| type peers map[string]*peer | 	errAlreadyFetching   = errors.New("already fetching blocks from peer") | ||||||
|  | 	errAlreadyRegistered = errors.New("peer is already registered") | ||||||
|  | 	errNotRegistered     = errors.New("peer is not registered") | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| func (p peers) reset() { | // peer represents an active peer from which hashes and blocks are retrieved.
 | ||||||
| 	for _, peer := range p { |  | ||||||
| 		peer.reset() |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (p peers) get(state int) []*peer { |  | ||||||
| 	var peers []*peer |  | ||||||
| 	for _, peer := range p { |  | ||||||
| 		peer.mu.RLock() |  | ||||||
| 		if peer.state == state { |  | ||||||
| 			peers = append(peers, peer) |  | ||||||
| 		} |  | ||||||
| 		peer.mu.RUnlock() |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return peers |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (p peers) setState(id string, state int) { |  | ||||||
| 	if peer, exist := p[id]; exist { |  | ||||||
| 		peer.mu.Lock() |  | ||||||
| 		defer peer.mu.Unlock() |  | ||||||
| 		peer.state = state |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (p peers) getPeer(id string) *peer { |  | ||||||
| 	return p[id] |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // peer represents an active peer
 |  | ||||||
| type peer struct { | type peer struct { | ||||||
| 	state int // Peer state (working, idle)
 | 	id   string      // Unique identifier of the peer
 | ||||||
| 	rep   int // TODO peer reputation
 | 	head common.Hash // Hash of the peers latest known block
 | ||||||
| 
 | 
 | ||||||
| 	mu         sync.RWMutex | 	idle int32 // Current activity state of the peer (idle = 0, active = 1)
 | ||||||
| 	id         string | 	rep  int32 // Simple peer reputation (not used currently)
 | ||||||
| 	recentHash common.Hash | 
 | ||||||
|  | 	mu sync.RWMutex | ||||||
| 
 | 
 | ||||||
| 	ignored *set.Set | 	ignored *set.Set | ||||||
| 
 | 
 | ||||||
| @ -65,31 +37,31 @@ type peer struct { | |||||||
| 	getBlocks blockFetcherFn | 	getBlocks blockFetcherFn | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // create a new peer
 | // newPeer create a new downloader peer, with specific hash and block retrieval
 | ||||||
| func newPeer(id string, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { | // mechanisms.
 | ||||||
|  | func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { | ||||||
| 	return &peer{ | 	return &peer{ | ||||||
| 		id:         id, | 		id:        id, | ||||||
| 		recentHash: hash, | 		head:      head, | ||||||
| 		getHashes:  getHashes, | 		getHashes: getHashes, | ||||||
| 		getBlocks:  getBlocks, | 		getBlocks: getBlocks, | ||||||
| 		state:      idleState, | 		ignored:   set.New(), | ||||||
| 		ignored:    set.New(), |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // fetch a chunk using the peer
 | // Reset clears the internal state of a peer entity.
 | ||||||
| func (p *peer) fetch(request *fetchRequest) error { | func (p *peer) Reset() { | ||||||
| 	p.mu.Lock() | 	atomic.StoreInt32(&p.idle, 0) | ||||||
| 	defer p.mu.Unlock() | 	p.ignored.Clear() | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| 	if p.state == workingState { | // Fetch sends a block retrieval request to the remote peer.
 | ||||||
| 		return errors.New("peer already fetching chunk") | func (p *peer) Fetch(request *fetchRequest) error { | ||||||
|  | 	// Short circuit if the peer is already fetching
 | ||||||
|  | 	if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { | ||||||
|  | 		return errAlreadyFetching | ||||||
| 	} | 	} | ||||||
| 
 | 	// Convert the hash set to a retrievable slice
 | ||||||
| 	// set working state
 |  | ||||||
| 	p.state = workingState |  | ||||||
| 
 |  | ||||||
| 	// Convert the hash set to a fetchable slice
 |  | ||||||
| 	hashes := make([]common.Hash, 0, len(request.Hashes)) | 	hashes := make([]common.Hash, 0, len(request.Hashes)) | ||||||
| 	for hash, _ := range request.Hashes { | 	for hash, _ := range request.Hashes { | ||||||
| 		hashes = append(hashes, hash) | 		hashes = append(hashes, hash) | ||||||
| @ -99,27 +71,122 @@ func (p *peer) fetch(request *fetchRequest) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // promote increases the peer's reputation
 | // SetIdle sets the peer to idle, allowing it to execute new retrieval requests.
 | ||||||
| func (p *peer) promote() { | func (p *peer) SetIdle() { | ||||||
| 	p.mu.Lock() | 	atomic.StoreInt32(&p.idle, 0) | ||||||
| 	defer p.mu.Unlock() |  | ||||||
| 
 |  | ||||||
| 	p.rep++ |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // demote decreases the peer's reputation or leaves it at 0
 | // Promote increases the peer's reputation.
 | ||||||
| func (p *peer) demote() { | func (p *peer) Promote() { | ||||||
| 	p.mu.Lock() | 	atomic.AddInt32(&p.rep, 1) | ||||||
| 	defer p.mu.Unlock() | } | ||||||
| 
 | 
 | ||||||
| 	if p.rep > 1 { | // Demote decreases the peer's reputation or leaves it at 0.
 | ||||||
| 		p.rep -= 2 | func (p *peer) Demote() { | ||||||
| 	} else { | 	for { | ||||||
| 		p.rep = 0 | 		// Calculate the new reputation value
 | ||||||
|  | 		prev := atomic.LoadInt32(&p.rep) | ||||||
|  | 		next := prev - 2 | ||||||
|  | 		if next < 0 { | ||||||
|  | 			next = 0 | ||||||
|  | 		} | ||||||
|  | 		// Try to update the old value
 | ||||||
|  | 		if atomic.CompareAndSwapInt32(&p.rep, prev, next) { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (p *peer) reset() { | // peerSet represents the collection of active peer participating in the block
 | ||||||
| 	p.state = idleState | // download procedure.
 | ||||||
| 	p.ignored.Clear() | type peerSet struct { | ||||||
|  | 	peers map[string]*peer | ||||||
|  | 	lock  sync.RWMutex | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // newPeerSet creates a new peer set top track the active download sources.
 | ||||||
|  | func newPeerSet() *peerSet { | ||||||
|  | 	return &peerSet{ | ||||||
|  | 		peers: make(map[string]*peer), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Reset iterates over the current peer set, and resets each of the known peers
 | ||||||
|  | // to prepare for a next batch of block retrieval.
 | ||||||
|  | func (ps *peerSet) Reset() { | ||||||
|  | 	ps.lock.RLock() | ||||||
|  | 	defer ps.lock.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	for _, peer := range ps.peers { | ||||||
|  | 		peer.Reset() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Register injects a new peer into the working set, or returns an error if the
 | ||||||
|  | // peer is already known.
 | ||||||
|  | func (ps *peerSet) Register(p *peer) error { | ||||||
|  | 	ps.lock.Lock() | ||||||
|  | 	defer ps.lock.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if _, ok := ps.peers[p.id]; ok { | ||||||
|  | 		return errAlreadyRegistered | ||||||
|  | 	} | ||||||
|  | 	ps.peers[p.id] = p | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Unregister removes a remote peer from the active set, disabling any further
 | ||||||
|  | // actions to/from that particular entity.
 | ||||||
|  | func (ps *peerSet) Unregister(id string) error { | ||||||
|  | 	ps.lock.Lock() | ||||||
|  | 	defer ps.lock.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if _, ok := ps.peers[id]; !ok { | ||||||
|  | 		return errNotRegistered | ||||||
|  | 	} | ||||||
|  | 	delete(ps.peers, id) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Peer retrieves the registered peer with the given id.
 | ||||||
|  | func (ps *peerSet) Peer(id string) *peer { | ||||||
|  | 	ps.lock.RLock() | ||||||
|  | 	defer ps.lock.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return ps.peers[id] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Peers returns if the current number of peers in the set.
 | ||||||
|  | func (ps *peerSet) Peers() int { | ||||||
|  | 	ps.lock.RLock() | ||||||
|  | 	defer ps.lock.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	return len(ps.peers) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // AllPeers retrieves a flat list of all the peers within the set.
 | ||||||
|  | func (ps *peerSet) AllPeers() []*peer { | ||||||
|  | 	ps.lock.RLock() | ||||||
|  | 	defer ps.lock.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	list := make([]*peer, 0, len(ps.peers)) | ||||||
|  | 	for _, p := range ps.peers { | ||||||
|  | 		list = append(list, p) | ||||||
|  | 	} | ||||||
|  | 	return list | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IdlePeers retrieves a flat list of all the currently idle peers within the
 | ||||||
|  | // active peer set.
 | ||||||
|  | func (ps *peerSet) IdlePeers() []*peer { | ||||||
|  | 	ps.lock.RLock() | ||||||
|  | 	defer ps.lock.RUnlock() | ||||||
|  | 
 | ||||||
|  | 	list := make([]*peer, 0, len(ps.peers)) | ||||||
|  | 	for _, p := range ps.peers { | ||||||
|  | 		if atomic.LoadInt32(&p.idle) == 0 { | ||||||
|  | 			list = append(list, p) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return list | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,3 +1,6 @@ | |||||||
|  | // Contains the block download scheduler to collect download tasks and schedule
 | ||||||
|  | // them in an ordered, and throttled way.
 | ||||||
|  | 
 | ||||||
| package downloader | package downloader | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| @ -8,6 +11,8 @@ import ( | |||||||
| 
 | 
 | ||||||
| 	"github.com/ethereum/go-ethereum/common" | 	"github.com/ethereum/go-ethereum/common" | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
|  | 	"github.com/ethereum/go-ethereum/logger" | ||||||
|  | 	"github.com/ethereum/go-ethereum/logger/glog" | ||||||
| 	"gopkg.in/karalabe/cookiejar.v2/collections/prque" | 	"gopkg.in/karalabe/cookiejar.v2/collections/prque" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -126,6 +131,10 @@ func (q *queue) Insert(hashes []common.Hash) { | |||||||
| 	for i, hash := range hashes { | 	for i, hash := range hashes { | ||||||
| 		index := q.hashCounter + i | 		index := q.hashCounter + i | ||||||
| 
 | 
 | ||||||
|  | 		if old, ok := q.hashPool[hash]; ok { | ||||||
|  | 			glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
| 		q.hashPool[hash] = index | 		q.hashPool[hash] = index | ||||||
| 		q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
 | 		q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
 | ||||||
| 	} | 	} | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user