eth/downloader: differentiate stale and nonexistent deliveries
This commit is contained in:
		
							parent
							
								
									94e4aa6ea9
								
							
						
					
					
						commit
						328ef60b85
					
				| @ -422,28 +422,46 @@ out: | |||||||
| 			// in a reasonable time frame, ignore it's message.
 | 			// in a reasonable time frame, ignore it's message.
 | ||||||
| 			if peer := d.peers.Peer(blockPack.peerId); peer != nil { | 			if peer := d.peers.Peer(blockPack.peerId); peer != nil { | ||||||
| 				// Deliver the received chunk of blocks, and demote in case of errors
 | 				// Deliver the received chunk of blocks, and demote in case of errors
 | ||||||
| 				if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil { | 				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) | ||||||
| 					if err == ErrInvalidChain { | 				switch err { | ||||||
| 						// The hash chain is invalid (blocks are not ordered properly), abort
 | 				case nil: | ||||||
| 						return err | 					// If no blocks were delivered, demote the peer (need the delivery above)
 | ||||||
|  | 					if len(blockPack.blocks) == 0 { | ||||||
|  | 						peer.Demote() | ||||||
|  | 						peer.SetIdle() | ||||||
|  | 						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) | ||||||
|  | 						break | ||||||
| 					} | 					} | ||||||
| 					// Peer did deliver, but some blocks were off, penalize
 | 					// All was successful, promote the peer
 | ||||||
|  | 					peer.Promote() | ||||||
|  | 					peer.SetIdle() | ||||||
|  | 					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) | ||||||
|  | 
 | ||||||
|  | 				case ErrInvalidChain: | ||||||
|  | 					// The hash chain is invalid (blocks are not ordered properly), abort
 | ||||||
|  | 					return err | ||||||
|  | 
 | ||||||
|  | 				case errNoFetchesPending: | ||||||
|  | 					// Peer probably timed out with its delivery but came through
 | ||||||
|  | 					// in the end, demote, but allow to to pull from this peer.
 | ||||||
| 					peer.Demote() | 					peer.Demote() | ||||||
| 					peer.SetIdle() | 					peer.SetIdle() | ||||||
| 					glog.V(logger.Detail).Infof("%s: block delivery failed: %v", peer, err) | 					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) | ||||||
| 					break | 
 | ||||||
| 				} | 				case errStaleDelivery: | ||||||
| 				// If no blocks were delivered, demote the peer (above code is needed to mark the packet done!)
 | 					// Delivered something completely else than requested, usually
 | ||||||
| 				if len(blockPack.blocks) == 0 { | 					// caused by a timeout and delivery during a new sync cycle.
 | ||||||
|  | 					// Don't set it to idle as the original request should still be
 | ||||||
|  | 					// in flight.
 | ||||||
|  | 					peer.Demote() | ||||||
|  | 					glog.V(logger.Detail).Infof("%s: stale delivery", peer) | ||||||
|  | 
 | ||||||
|  | 				default: | ||||||
|  | 					// Peer did something semi-useful, demote but keep it around
 | ||||||
| 					peer.Demote() | 					peer.Demote() | ||||||
| 					peer.SetIdle() | 					peer.SetIdle() | ||||||
| 					glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) | 					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) | ||||||
| 					break |  | ||||||
| 				} | 				} | ||||||
| 				// All was successful, promote the peer
 |  | ||||||
| 				peer.Promote() |  | ||||||
| 				peer.SetIdle() |  | ||||||
| 				glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) |  | ||||||
| 			} | 			} | ||||||
| 		case <-ticker.C: | 		case <-ticker.C: | ||||||
| 			// Check for bad peers. Bad peers may indicate a peer not responding
 | 			// Check for bad peers. Bad peers may indicate a peer not responding
 | ||||||
|  | |||||||
| @ -20,6 +20,11 @@ const ( | |||||||
| 	blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
 | 	blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download
 | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | var ( | ||||||
|  | 	errNoFetchesPending = errors.New("no fetches pending") | ||||||
|  | 	errStaleDelivery    = errors.New("stale delivery") | ||||||
|  | ) | ||||||
|  | 
 | ||||||
| // fetchRequest is a currently running block retrieval operation.
 | // fetchRequest is a currently running block retrieval operation.
 | ||||||
| type fetchRequest struct { | type fetchRequest struct { | ||||||
| 	Peer   *peer               // Peer to which the request was sent
 | 	Peer   *peer               // Peer to which the request was sent
 | ||||||
| @ -293,7 +298,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { | |||||||
| 	// Short circuit if the blocks were never requested
 | 	// Short circuit if the blocks were never requested
 | ||||||
| 	request := q.pendPool[id] | 	request := q.pendPool[id] | ||||||
| 	if request == nil { | 	if request == nil { | ||||||
| 		return errors.New("no fetches pending") | 		return errNoFetchesPending | ||||||
| 	} | 	} | ||||||
| 	delete(q.pendPool, id) | 	delete(q.pendPool, id) | ||||||
| 
 | 
 | ||||||
| @ -309,7 +314,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { | |||||||
| 		// Skip any blocks that were not requested
 | 		// Skip any blocks that were not requested
 | ||||||
| 		hash := block.Hash() | 		hash := block.Hash() | ||||||
| 		if _, ok := request.Hashes[hash]; !ok { | 		if _, ok := request.Hashes[hash]; !ok { | ||||||
| 			errs = append(errs, fmt.Errorf("non-requested block %v", hash)) | 			errs = append(errs, fmt.Errorf("non-requested block %x", hash)) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		// If a requested block falls out of the range, the hash chain is invalid
 | 		// If a requested block falls out of the range, the hash chain is invalid
 | ||||||
| @ -326,11 +331,15 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { | |||||||
| 		delete(q.hashPool, hash) | 		delete(q.hashPool, hash) | ||||||
| 		q.blockPool[hash] = int(block.NumberU64()) | 		q.blockPool[hash] = int(block.NumberU64()) | ||||||
| 	} | 	} | ||||||
| 	// Return all failed fetches to the queue
 | 	// Return all failed or missing fetches to the queue
 | ||||||
| 	for hash, index := range request.Hashes { | 	for hash, index := range request.Hashes { | ||||||
| 		q.hashQueue.Push(hash, float32(index)) | 		q.hashQueue.Push(hash, float32(index)) | ||||||
| 	} | 	} | ||||||
|  | 	// If none of the blocks were good, it's a stale delivery
 | ||||||
| 	if len(errs) != 0 { | 	if len(errs) != 0 { | ||||||
|  | 		if len(errs) == len(blocks) { | ||||||
|  | 			return errStaleDelivery | ||||||
|  | 		} | ||||||
| 		return fmt.Errorf("multiple failures: %v", errs) | 		return fmt.Errorf("multiple failures: %v", errs) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user