eth: dedup fetches to ensure no blocks are pulled twice
This commit is contained in:
		
							parent
							
								
									355b1e3bb1
								
							
						
					
					
						commit
						e61db7145a
					
				
							
								
								
									
										26
									
								
								eth/sync.go
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								eth/sync.go
									
									
									
									
									
								
							| @ -131,6 +131,7 @@ func (pm *ProtocolManager) fetcher() { | |||||||
| 	request := make(map[*peer][]common.Hash) | 	request := make(map[*peer][]common.Hash) | ||||||
| 	pending := make(map[common.Hash]*blockAnnounce) | 	pending := make(map[common.Hash]*blockAnnounce) | ||||||
| 	cycle := time.Tick(notifyCheckCycle) | 	cycle := time.Tick(notifyCheckCycle) | ||||||
|  | 	done := make(chan common.Hash) | ||||||
| 
 | 
 | ||||||
| 	// Iterate the block fetching until a quit is requested
 | 	// Iterate the block fetching until a quit is requested
 | ||||||
| 	for { | 	for { | ||||||
| @ -139,9 +140,18 @@ func (pm *ProtocolManager) fetcher() { | |||||||
| 			// A batch of hashes the notified, schedule them for retrieval
 | 			// A batch of hashes the notified, schedule them for retrieval
 | ||||||
| 			glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) | 			glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id) | ||||||
| 			for _, announce := range notifications { | 			for _, announce := range notifications { | ||||||
|  | 				// Skip if it's already pending fetch
 | ||||||
|  | 				if _, ok := pending[announce.hash]; ok { | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				// Otherwise queue up the peer as a potential source
 | ||||||
| 				announces[announce.hash] = append(announces[announce.hash], announce) | 				announces[announce.hash] = append(announces[announce.hash], announce) | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
|  | 		case hash := <-done: | ||||||
|  | 			// A pending import finished, remove all traces
 | ||||||
|  | 			delete(pending, hash) | ||||||
|  | 
 | ||||||
| 		case <-cycle: | 		case <-cycle: | ||||||
| 			// Clean up any expired block fetches
 | 			// Clean up any expired block fetches
 | ||||||
| 			for hash, announce := range pending { | 			for hash, announce := range pending { | ||||||
| @ -207,18 +217,26 @@ func (pm *ProtocolManager) fetcher() { | |||||||
| 			for _, block := range explicit { | 			for _, block := range explicit { | ||||||
| 				hash := block.Hash() | 				hash := block.Hash() | ||||||
| 				if announce := pending[hash]; announce != nil { | 				if announce := pending[hash]; announce != nil { | ||||||
| 					// Filter out blocks too new to import anyway
 | 					// Drop the block if it surely cannot fit
 | ||||||
| 					if !pm.chainman.HasBlock(hash) && pm.chainman.HasBlock(block.ParentHash()) { | 					if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) { | ||||||
|  | 						delete(pending, hash) | ||||||
|  | 						continue | ||||||
|  | 					} | ||||||
|  | 					// Otherwise accumulate for import
 | ||||||
| 					peers = append(peers, announce.peer) | 					peers = append(peers, announce.peer) | ||||||
| 					blocks = append(blocks, block) | 					blocks = append(blocks, block) | ||||||
| 				} | 				} | ||||||
| 					delete(pending, hash) |  | ||||||
| 				} |  | ||||||
| 			} | 			} | ||||||
| 			// If any explicit fetches were replied to, import them
 | 			// If any explicit fetches were replied to, import them
 | ||||||
| 			if count := len(blocks); count > 0 { | 			if count := len(blocks); count > 0 { | ||||||
| 				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) | 				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks)) | ||||||
| 				go func() { | 				go func() { | ||||||
|  | 					// Make sure all hashes are cleaned up
 | ||||||
|  | 					for _, block := range blocks { | ||||||
|  | 						hash := block.Hash() | ||||||
|  | 						defer func() { done <- hash }() | ||||||
|  | 					} | ||||||
|  | 					// Try and actually import the blocks
 | ||||||
| 					for i := 0; i < len(blocks); i++ { | 					for i := 0; i < len(blocks); i++ { | ||||||
| 						if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { | 						if err := pm.importBlock(peers[i], blocks[i], nil); err != nil { | ||||||
| 							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) | 							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err) | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user