Merge pull request #1188 from karalabe/newblockhashes-proposal
eth: implement the NewBlockHashes protocol proposal
This commit is contained in:
		
						commit
						05cae69d72
					
				
							
								
								
									
										202
									
								
								eth/handler.go
									
									
									
									
									
								
							
							
						
						
									
										202
									
								
								eth/handler.go
									
									
									
									
									
								
							| @ -2,6 +2,7 @@ package eth | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"math" | ||||||
| 	"math/big" | 	"math/big" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| @ -17,13 +18,6 @@ import ( | |||||||
| 	"github.com/ethereum/go-ethereum/rlp" | 	"github.com/ethereum/go-ethereum/rlp" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( |  | ||||||
| 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 |  | ||||||
| 	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
 |  | ||||||
| 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 |  | ||||||
| 	blockProcAmount     = 256 |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| func errResp(code errCode, format string, v ...interface{}) error { | func errResp(code errCode, format string, v ...interface{}) error { | ||||||
| 	return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) | 	return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) | ||||||
| } | } | ||||||
| @ -54,8 +48,11 @@ type ProtocolManager struct { | |||||||
| 	txSub         event.Subscription | 	txSub         event.Subscription | ||||||
| 	minedBlockSub event.Subscription | 	minedBlockSub event.Subscription | ||||||
| 
 | 
 | ||||||
| 	newPeerCh chan *peer | 	newPeerCh  chan *peer | ||||||
| 	quitSync  chan struct{} | 	newHashCh  chan []*blockAnnounce | ||||||
|  | 	newBlockCh chan chan []*types.Block | ||||||
|  | 	quitSync   chan struct{} | ||||||
|  | 
 | ||||||
| 	// wait group is used for graceful shutdowns during downloading
 | 	// wait group is used for graceful shutdowns during downloading
 | ||||||
| 	// and processing
 | 	// and processing
 | ||||||
| 	wg   sync.WaitGroup | 	wg   sync.WaitGroup | ||||||
| @ -72,6 +69,8 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo | |||||||
| 		downloader: downloader, | 		downloader: downloader, | ||||||
| 		peers:      newPeerSet(), | 		peers:      newPeerSet(), | ||||||
| 		newPeerCh:  make(chan *peer, 1), | 		newPeerCh:  make(chan *peer, 1), | ||||||
|  | 		newHashCh:  make(chan []*blockAnnounce, 1), | ||||||
|  | 		newBlockCh: make(chan chan []*types.Block), | ||||||
| 		quitSync:   make(chan struct{}), | 		quitSync:   make(chan struct{}), | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -119,7 +118,8 @@ func (pm *ProtocolManager) Start() { | |||||||
| 	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) | 	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) | ||||||
| 	go pm.minedBroadcastLoop() | 	go pm.minedBroadcastLoop() | ||||||
| 
 | 
 | ||||||
| 	go pm.update() | 	go pm.syncer() | ||||||
|  | 	go pm.fetcher() | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (pm *ProtocolManager) Stop() { | func (pm *ProtocolManager) Stop() { | ||||||
| @ -186,7 +186,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error { | |||||||
| 	defer msg.Discard() | 	defer msg.Discard() | ||||||
| 
 | 
 | ||||||
| 	switch msg.Code { | 	switch msg.Code { | ||||||
| 	case GetTxMsg: // ignore
 |  | ||||||
| 	case StatusMsg: | 	case StatusMsg: | ||||||
| 		return errResp(ErrExtraStatusMsg, "uncontrolled status message") | 		return errResp(ErrExtraStatusMsg, "uncontrolled status message") | ||||||
| 
 | 
 | ||||||
| @ -227,6 +226,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error { | |||||||
| 
 | 
 | ||||||
| 		// returns either requested hashes or nothing (i.e. not found)
 | 		// returns either requested hashes or nothing (i.e. not found)
 | ||||||
| 		return p.sendBlockHashes(hashes) | 		return p.sendBlockHashes(hashes) | ||||||
|  | 
 | ||||||
| 	case BlockHashesMsg: | 	case BlockHashesMsg: | ||||||
| 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) | 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) | ||||||
| 
 | 
 | ||||||
| @ -266,15 +266,66 @@ func (self *ProtocolManager) handleMsg(p *peer) error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return p.sendBlocks(blocks) | 		return p.sendBlocks(blocks) | ||||||
| 	case BlocksMsg: |  | ||||||
| 		var blocks []*types.Block |  | ||||||
| 
 | 
 | ||||||
|  | 	case BlocksMsg: | ||||||
|  | 		// Decode the arrived block message
 | ||||||
| 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) | 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) | ||||||
|  | 
 | ||||||
|  | 		var blocks []*types.Block | ||||||
| 		if err := msgStream.Decode(&blocks); err != nil { | 		if err := msgStream.Decode(&blocks); err != nil { | ||||||
| 			glog.V(logger.Detail).Infoln("Decode error", err) | 			glog.V(logger.Detail).Infoln("Decode error", err) | ||||||
| 			blocks = nil | 			blocks = nil | ||||||
| 		} | 		} | ||||||
| 		self.downloader.DeliverBlocks(p.id, blocks) | 		// Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
 | ||||||
|  | 		filter := make(chan []*types.Block) | ||||||
|  | 		select { | ||||||
|  | 		case <-self.quitSync: | ||||||
|  | 		case self.newBlockCh <- filter: | ||||||
|  | 			select { | ||||||
|  | 			case <-self.quitSync: | ||||||
|  | 			case filter <- blocks: | ||||||
|  | 				select { | ||||||
|  | 				case <-self.quitSync: | ||||||
|  | 				case blocks := <-filter: | ||||||
|  | 					self.downloader.DeliverBlocks(p.id, blocks) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 	case NewBlockHashesMsg: | ||||||
|  | 		// Retrieve and deseralize the remote new block hashes notification
 | ||||||
|  | 		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) | ||||||
|  | 
 | ||||||
|  | 		var hashes []common.Hash | ||||||
|  | 		if err := msgStream.Decode(&hashes); err != nil { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		// Mark the hashes as present at the remote node
 | ||||||
|  | 		for _, hash := range hashes { | ||||||
|  | 			p.blockHashes.Add(hash) | ||||||
|  | 			p.recentHash = hash | ||||||
|  | 		} | ||||||
|  | 		// Schedule all the unknown hashes for retrieval
 | ||||||
|  | 		unknown := make([]common.Hash, 0, len(hashes)) | ||||||
|  | 		for _, hash := range hashes { | ||||||
|  | 			if !self.chainman.HasBlock(hash) { | ||||||
|  | 				unknown = append(unknown, hash) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		announces := make([]*blockAnnounce, len(unknown)) | ||||||
|  | 		for i, hash := range unknown { | ||||||
|  | 			announces[i] = &blockAnnounce{ | ||||||
|  | 				hash: hash, | ||||||
|  | 				peer: p, | ||||||
|  | 				time: time.Now(), | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if len(announces) > 0 { | ||||||
|  | 			select { | ||||||
|  | 			case self.newHashCh <- announces: | ||||||
|  | 			case <-self.quitSync: | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 	case NewBlockMsg: | 	case NewBlockMsg: | ||||||
| 		var request newBlockMsgData | 		var request newBlockMsgData | ||||||
| @ -286,83 +337,86 @@ func (self *ProtocolManager) handleMsg(p *peer) error { | |||||||
| 		} | 		} | ||||||
| 		request.Block.ReceivedAt = msg.ReceivedAt | 		request.Block.ReceivedAt = msg.ReceivedAt | ||||||
| 
 | 
 | ||||||
| 		hash := request.Block.Hash() | 		if err := self.importBlock(p, request.Block, request.TD); err != nil { | ||||||
| 		// Add the block hash as a known hash to the peer. This will later be used to determine
 | 			return err | ||||||
| 		// who should receive this.
 |  | ||||||
| 		p.blockHashes.Add(hash) |  | ||||||
| 		// update the peer info
 |  | ||||||
| 		p.recentHash = hash |  | ||||||
| 		p.td = request.TD |  | ||||||
| 
 |  | ||||||
| 		_, chainHead, _ := self.chainman.Status() |  | ||||||
| 
 |  | ||||||
| 		jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ |  | ||||||
| 			BlockHash:     hash.Hex(), |  | ||||||
| 			BlockNumber:   request.Block.Number(), // this surely must be zero
 |  | ||||||
| 			ChainHeadHash: chainHead.Hex(), |  | ||||||
| 			BlockPrevHash: request.Block.ParentHash().Hex(), |  | ||||||
| 			RemoteId:      p.ID().String(), |  | ||||||
| 		}) |  | ||||||
| 
 |  | ||||||
| 		// Make sure the block isn't already known. If this is the case simply drop
 |  | ||||||
| 		// the message and move on. If the TD is < currentTd; drop it as well. If this
 |  | ||||||
| 		// chain at some point becomes canonical, the downloader will fetch it.
 |  | ||||||
| 		if self.chainman.HasBlock(hash) { |  | ||||||
| 			break |  | ||||||
| 		} |  | ||||||
| 		if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { |  | ||||||
| 			glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD) |  | ||||||
| 			break |  | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// Attempt to insert the newly received by checking if the parent exists.
 |  | ||||||
| 		// if the parent exists we process the block and propagate to our peers
 |  | ||||||
| 		// otherwise synchronize with the peer
 |  | ||||||
| 		if self.chainman.HasBlock(request.Block.ParentHash()) { |  | ||||||
| 			if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil { |  | ||||||
| 				glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error") |  | ||||||
| 
 |  | ||||||
| 				self.removePeer(p.id) |  | ||||||
| 
 |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			if err := self.verifyTd(p, request); err != nil { |  | ||||||
| 				glog.V(logger.Error).Infoln(err) |  | ||||||
| 				// XXX for now return nil so it won't disconnect (we should in the future)
 |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 			self.BroadcastBlock(hash, request.Block) |  | ||||||
| 		} else { |  | ||||||
| 			go self.synchronise(p) |  | ||||||
| 		} |  | ||||||
| 	default: | 	default: | ||||||
| 		return errResp(ErrInvalidMsgCode, "%v", msg.Code) | 		return errResp(ErrInvalidMsgCode, "%v", msg.Code) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error { | // importBlocks injects a new block retrieved from the given peer into the chain
 | ||||||
| 	if request.Block.Td.Cmp(request.TD) != 0 { | // manager.
 | ||||||
| 		glog.V(logger.Detail).Infoln(peer) | func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) error { | ||||||
|  | 	hash := block.Hash() | ||||||
| 
 | 
 | ||||||
| 		return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD) | 	// Mark the block as present at the remote node (don't duplicate already held data)
 | ||||||
|  | 	p.blockHashes.Add(hash) | ||||||
|  | 	p.recentHash = hash | ||||||
|  | 	if td != nil { | ||||||
|  | 		p.td = td | ||||||
|  | 	} | ||||||
|  | 	// Log the block's arrival
 | ||||||
|  | 	_, chainHead, _ := pm.chainman.Status() | ||||||
|  | 	jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{ | ||||||
|  | 		BlockHash:     hash.Hex(), | ||||||
|  | 		BlockNumber:   block.Number(), | ||||||
|  | 		ChainHeadHash: chainHead.Hex(), | ||||||
|  | 		BlockPrevHash: block.ParentHash().Hex(), | ||||||
|  | 		RemoteId:      p.ID().String(), | ||||||
|  | 	}) | ||||||
|  | 	// If the block's already known or its difficulty is lower than ours, drop
 | ||||||
|  | 	if pm.chainman.HasBlock(hash) { | ||||||
|  | 		p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value
 | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 { | ||||||
|  | 		glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, block.Number(), td) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	// Attempt to insert the newly received block and propagate to our peers
 | ||||||
|  | 	if pm.chainman.HasBlock(block.ParentHash()) { | ||||||
|  | 		if _, err := pm.chainman.InsertChain(types.Blocks{block}); err != nil { | ||||||
|  | 			glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error", err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		if td != nil && block.Td.Cmp(td) != 0 { | ||||||
|  | 			err := fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", block.Number(), p.id, block.Td, td) | ||||||
|  | 			glog.V(logger.Error).Infoln(err) | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		pm.BroadcastBlock(hash, block) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 	// Parent of the block is unknown, try to sync with this peer if it seems to be good
 | ||||||
|  | 	if td != nil { | ||||||
|  | 		go pm.synchronise(p) | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // BroadcastBlock will propagate the block to its connected peers. It will sort
 | // BroadcastBlock will propagate the block to a subset of its connected peers,
 | ||||||
| // out which peers do not contain the block in their block set and will do a
 | // only notifying the rest of the block's appearance.
 | ||||||
| // sqrt(peers) to determine the amount of peers we broadcast to.
 |  | ||||||
| func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { | func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) { | ||||||
| 	// Broadcast block to a batch of peers not knowing about it
 | 	// Retrieve all the target peers and split between full broadcast or only notification
 | ||||||
| 	peers := pm.peers.PeersWithoutBlock(hash) | 	peers := pm.peers.PeersWithoutBlock(hash) | ||||||
| 	//peers = peers[:int(math.Sqrt(float64(len(peers))))]
 | 	split := int(math.Sqrt(float64(len(peers)))) | ||||||
| 	for _, peer := range peers { | 
 | ||||||
|  | 	transfer := peers[:split] | ||||||
|  | 	notify := peers[split:] | ||||||
|  | 
 | ||||||
|  | 	// Send out the data transfers and the notifications
 | ||||||
|  | 	for _, peer := range notify { | ||||||
|  | 		peer.sendNewBlockHashes([]common.Hash{hash}) | ||||||
|  | 	} | ||||||
|  | 	glog.V(logger.Detail).Infoln("broadcast hash to", len(notify), "peers.") | ||||||
|  | 
 | ||||||
|  | 	for _, peer := range transfer { | ||||||
| 		peer.sendNewBlock(block) | 		peer.sendNewBlock(block) | ||||||
| 	} | 	} | ||||||
| 	glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt)) | 	glog.V(logger.Detail).Infoln("broadcast block to", len(transfer), "peers. Total processing time:", time.Since(block.ReceivedAt)) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // BroadcastTx will propagate the block to its connected peers. It will sort
 | // BroadcastTx will propagate the block to its connected peers. It will sort
 | ||||||
|  | |||||||
| @ -88,6 +88,13 @@ func (p *peer) sendBlocks(blocks []*types.Block) error { | |||||||
| 	return p2p.Send(p.rw, BlocksMsg, blocks) | 	return p2p.Send(p.rw, BlocksMsg, blocks) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (p *peer) sendNewBlockHashes(hashes []common.Hash) error { | ||||||
|  | 	for _, hash := range hashes { | ||||||
|  | 		p.blockHashes.Add(hash) | ||||||
|  | 	} | ||||||
|  | 	return p2p.Send(p.rw, NewBlockHashesMsg, hashes) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (p *peer) sendNewBlock(block *types.Block) error { | func (p *peer) sendNewBlock(block *types.Block) error { | ||||||
| 	p.blockHashes.Add(block.Hash()) | 	p.blockHashes.Add(block.Hash()) | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -17,7 +17,7 @@ const ( | |||||||
| // eth protocol message codes
 | // eth protocol message codes
 | ||||||
| const ( | const ( | ||||||
| 	StatusMsg = iota | 	StatusMsg = iota | ||||||
| 	GetTxMsg  // unused
 | 	NewBlockHashesMsg | ||||||
| 	TxMsg | 	TxMsg | ||||||
| 	GetBlockHashesMsg | 	GetBlockHashesMsg | ||||||
| 	BlockHashesMsg | 	BlockHashesMsg | ||||||
|  | |||||||
							
								
								
									
										132
									
								
								eth/sync.go
									
									
									
									
									
								
							
							
						
						
									
										132
									
								
								eth/sync.go
									
									
									
									
									
								
							| @ -5,15 +5,135 @@ import ( | |||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/ethereum/go-ethereum/common" | ||||||
| 	"github.com/ethereum/go-ethereum/core/types" | 	"github.com/ethereum/go-ethereum/core/types" | ||||||
| 	"github.com/ethereum/go-ethereum/eth/downloader" | 	"github.com/ethereum/go-ethereum/eth/downloader" | ||||||
| 	"github.com/ethereum/go-ethereum/logger" | 	"github.com/ethereum/go-ethereum/logger" | ||||||
| 	"github.com/ethereum/go-ethereum/logger/glog" | 	"github.com/ethereum/go-ethereum/logger/glog" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // update periodically tries to synchronise with the network, both downloading
 | const ( | ||||||
| // hashes and blocks as well as retrieving cached ones.
 | 	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 | ||||||
| func (pm *ProtocolManager) update() { | 	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
 | ||||||
|  | 	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
 | ||||||
|  | 	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 | ||||||
|  | 	notifyFetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
 | ||||||
|  | 	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 | ||||||
|  | 	blockProcAmount     = 256 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // blockAnnounce is the hash notification of the availability of a new block in
 | ||||||
|  | // the network.
 | ||||||
|  | type blockAnnounce struct { | ||||||
|  | 	hash common.Hash | ||||||
|  | 	peer *peer | ||||||
|  | 	time time.Time | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // fetcher is responsible for collecting hash notifications, and periodically
 | ||||||
|  | // checking all unknown ones and individually fetching them.
 | ||||||
|  | func (pm *ProtocolManager) fetcher() { | ||||||
|  | 	announces := make(map[common.Hash]*blockAnnounce) | ||||||
|  | 	request := make(map[*peer][]common.Hash) | ||||||
|  | 	pending := make(map[common.Hash]*blockAnnounce) | ||||||
|  | 	cycle := time.Tick(notifyCheckCycle) | ||||||
|  | 
 | ||||||
|  | 	// Iterate the block fetching until a quit is requested
 | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case notifications := <-pm.newHashCh: | ||||||
|  | 			// A batch of hashes the notified, schedule them for retrieval
 | ||||||
|  | 			glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) | ||||||
|  | 			for _, announce := range notifications { | ||||||
|  | 				announces[announce.hash] = announce | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 		case <-cycle: | ||||||
|  | 			// Clean up any expired block fetches
 | ||||||
|  | 			for hash, announce := range pending { | ||||||
|  | 				if time.Since(announce.time) > notifyFetchTimeout { | ||||||
|  | 					delete(pending, hash) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			// Check if any notified blocks failed to arrive
 | ||||||
|  | 			for hash, announce := range announces { | ||||||
|  | 				if time.Since(announce.time) > notifyArriveTimeout { | ||||||
|  | 					if !pm.chainman.HasBlock(hash) { | ||||||
|  | 						request[announce.peer] = append(request[announce.peer], hash) | ||||||
|  | 						pending[hash] = announce | ||||||
|  | 					} | ||||||
|  | 					delete(announces, hash) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 			if len(request) == 0 { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 			// Send out all block requests
 | ||||||
|  | 			for peer, hashes := range request { | ||||||
|  | 				glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) | ||||||
|  | 				peer.requestBlocks(hashes) | ||||||
|  | 			} | ||||||
|  | 			request = make(map[*peer][]common.Hash) | ||||||
|  | 
 | ||||||
|  | 		case filter := <-pm.newBlockCh: | ||||||
|  | 			// Blocks arrived, extract any explicit fetches, return all else
 | ||||||
|  | 			var blocks types.Blocks | ||||||
|  | 			select { | ||||||
|  | 			case blocks = <-filter: | ||||||
|  | 			case <-pm.quitSync: | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			explicit, download := []*types.Block{}, []*types.Block{} | ||||||
|  | 			for _, block := range blocks { | ||||||
|  | 				hash := block.Hash() | ||||||
|  | 
 | ||||||
|  | 				// Filter explicitly requested blocks from hash announcements
 | ||||||
|  | 				if _, ok := pending[hash]; ok { | ||||||
|  | 					// Discard if already imported by other means
 | ||||||
|  | 					if !pm.chainman.HasBlock(hash) { | ||||||
|  | 						explicit = append(explicit, block) | ||||||
|  | 					} else { | ||||||
|  | 						delete(pending, hash) | ||||||
|  | 					} | ||||||
|  | 				} else { | ||||||
|  | 					download = append(download, block) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			select { | ||||||
|  | 			case filter <- download: | ||||||
|  | 			case <-pm.quitSync: | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 			// If any explicit fetches were replied to, import them
 | ||||||
|  | 			if count := len(explicit); count > 0 { | ||||||
|  | 				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", count) | ||||||
|  | 				go func() { | ||||||
|  | 					for _, block := range explicit { | ||||||
|  | 						hash := block.Hash() | ||||||
|  | 
 | ||||||
|  | 						// Make sure there's still something pending to import
 | ||||||
|  | 						if announce := pending[hash]; announce != nil { | ||||||
|  | 							delete(pending, hash) | ||||||
|  | 							if err := pm.importBlock(announce.peer, block, nil); err != nil { | ||||||
|  | 								glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) | ||||||
|  | 								return | ||||||
|  | 							} | ||||||
|  | 						} | ||||||
|  | 					} | ||||||
|  | 				}() | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 		case <-pm.quitSync: | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // syncer is responsible for periodically synchronising with the network, both
 | ||||||
|  | // downloading hashes and blocks as well as retrieving cached ones.
 | ||||||
|  | func (pm *ProtocolManager) syncer() { | ||||||
| 	forceSync := time.Tick(forceSyncCycle) | 	forceSync := time.Tick(forceSyncCycle) | ||||||
| 	blockProc := time.Tick(blockProcCycle) | 	blockProc := time.Tick(blockProcCycle) | ||||||
| 	blockProcPend := int32(0) | 	blockProcPend := int32(0) | ||||||
| @ -99,15 +219,15 @@ func (pm *ProtocolManager) synchronise(peer *peer) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	// Get the hashes from the peer (synchronously)
 | 	// Get the hashes from the peer (synchronously)
 | ||||||
| 	glog.V(logger.Debug).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) | 	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, peer.recentHash) | ||||||
| 
 | 
 | ||||||
| 	err := pm.downloader.Synchronise(peer.id, peer.recentHash) | 	err := pm.downloader.Synchronise(peer.id, peer.recentHash) | ||||||
| 	switch err { | 	switch err { | ||||||
| 	case nil: | 	case nil: | ||||||
| 		glog.V(logger.Debug).Infof("Synchronisation completed") | 		glog.V(logger.Detail).Infof("Synchronisation completed") | ||||||
| 
 | 
 | ||||||
| 	case downloader.ErrBusy: | 	case downloader.ErrBusy: | ||||||
| 		glog.V(logger.Debug).Infof("Synchronisation already in progress") | 		glog.V(logger.Detail).Infof("Synchronisation already in progress") | ||||||
| 
 | 
 | ||||||
| 	case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: | 	case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: | ||||||
| 		glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) | 		glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user