core: partially removed nonce parallelisation and added merge error chk
Invalid forks are now detected Current setup of parellelisation actually inserts bad blocks. This fix is tmp until a better one is found
This commit is contained in:
		
							parent
							
								
									4baa5ca963
								
							
						
					
					
						commit
						75f5ae80fd
					
				| @ -548,18 +548,21 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 		tstart     = time.Now() | 		tstart     = time.Now() | ||||||
| 	) | 	) | ||||||
| 
 | 
 | ||||||
| 	// check the nonce in parallel to the block processing
 |  | ||||||
| 	// this speeds catching up significantly
 |  | ||||||
| 	nonceErrCh := make(chan error) |  | ||||||
| 	go func() { |  | ||||||
| 		nonceErrCh <- verifyNonces(self.pow, chain) |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	for i, block := range chain { | 	for i, block := range chain { | ||||||
| 		if block == nil { | 		if block == nil { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		if BadHashes[block.Hash()] { | ||||||
|  | 			err := fmt.Errorf("Found known bad hash in chain %x", block.Hash()) | ||||||
|  | 			blockErr(block, err) | ||||||
|  | 			return i, err | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// create a nonce channel for parallisation of the nonce check
 | ||||||
|  | 		nonceErrCh := make(chan error) | ||||||
|  | 		go verifyBlockNonce(self.pow, block, nonceErrCh) | ||||||
|  | 
 | ||||||
| 		// Setting block.Td regardless of error (known for example) prevents errors down the line
 | 		// Setting block.Td regardless of error (known for example) prevents errors down the line
 | ||||||
| 		// in the protocol handler
 | 		// in the protocol handler
 | ||||||
| 		block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) | 		block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) | ||||||
| @ -568,13 +571,14 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 		// all others will fail too (unless a known block is returned).
 | 		// all others will fail too (unless a known block is returned).
 | ||||||
| 		logs, err := self.processor.Process(block) | 		logs, err := self.processor.Process(block) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | 			// empty the nonce channel
 | ||||||
|  | 			<-nonceErrCh | ||||||
|  | 
 | ||||||
| 			if IsKnownBlockErr(err) { | 			if IsKnownBlockErr(err) { | ||||||
| 				stats.ignored++ | 				stats.ignored++ | ||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
| 
 | 
 | ||||||
| 			// Do not penelise on future block. We'll need a block queue eventually that will queue
 |  | ||||||
| 			// future block for future use
 |  | ||||||
| 			if err == BlockFutureErr { | 			if err == BlockFutureErr { | ||||||
| 				block.SetQueued(true) | 				block.SetQueued(true) | ||||||
| 				self.futureBlocks.Push(block) | 				self.futureBlocks.Push(block) | ||||||
| @ -593,18 +597,23 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 
 | 
 | ||||||
| 			return i, err | 			return i, err | ||||||
| 		} | 		} | ||||||
|  | 		// Wait and check nonce channel and make sure it checks out fine
 | ||||||
|  | 		// otherwise return the error
 | ||||||
|  | 		if err := <-nonceErrCh; err != nil { | ||||||
|  | 			return i, err | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 		cblock := self.currentBlock | 		cblock := self.currentBlock | ||||||
| 		// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
 |  | ||||||
| 		// not in the canonical chain.
 |  | ||||||
| 		self.write(block) |  | ||||||
| 		// Compare the TD of the last known block in the canonical chain to make sure it's greater.
 | 		// Compare the TD of the last known block in the canonical chain to make sure it's greater.
 | ||||||
| 		// At this point it's possible that a different chain (fork) becomes the new canonical chain.
 | 		// At this point it's possible that a different chain (fork) becomes the new canonical chain.
 | ||||||
| 		if block.Td.Cmp(self.td) > 0 { | 		if block.Td.Cmp(self.td) > 0 { | ||||||
| 			// chain fork
 | 			// chain fork
 | ||||||
| 			if block.ParentHash() != cblock.Hash() { | 			if block.ParentHash() != cblock.Hash() { | ||||||
| 				// during split we merge two different chains and create the new canonical chain
 | 				// during split we merge two different chains and create the new canonical chain
 | ||||||
| 				self.merge(cblock, block) | 				err := self.merge(cblock, block) | ||||||
|  | 				if err != nil { | ||||||
|  | 					return i, err | ||||||
|  | 				} | ||||||
| 
 | 
 | ||||||
| 				queue[i] = ChainSplitEvent{block, logs} | 				queue[i] = ChainSplitEvent{block, logs} | ||||||
| 				queueEvent.splitCount++ | 				queueEvent.splitCount++ | ||||||
| @ -637,19 +646,16 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 			queue[i] = ChainSideEvent{block, logs} | 			queue[i] = ChainSideEvent{block, logs} | ||||||
| 			queueEvent.sideCount++ | 			queueEvent.sideCount++ | ||||||
| 		} | 		} | ||||||
|  | 		// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
 | ||||||
|  | 		// not in the canonical chain.
 | ||||||
|  | 		self.write(block) | ||||||
|  | 		// Delete from future blocks
 | ||||||
| 		self.futureBlocks.Delete(block.Hash()) | 		self.futureBlocks.Delete(block.Hash()) | ||||||
| 
 | 
 | ||||||
| 		stats.processed++ | 		stats.processed++ | ||||||
| 
 | 
 | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// check and wait for the nonce error channel and
 |  | ||||||
| 	// make sure no nonce error was thrown in the process
 |  | ||||||
| 	err := <-nonceErrCh |  | ||||||
| 	if err != nil { |  | ||||||
| 		return 0, err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { | 	if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { | ||||||
| 		tend := time.Since(tstart) | 		tend := time.Since(tstart) | ||||||
| 		start, end := chain[0], chain[len(chain)-1] | 		start, end := chain[0], chain[len(chain)-1] | ||||||
| @ -663,7 +669,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { | |||||||
| 
 | 
 | ||||||
| // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
 | // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
 | ||||||
| // to be part of the new canonical chain.
 | // to be part of the new canonical chain.
 | ||||||
| func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks { | func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) { | ||||||
| 	var ( | 	var ( | ||||||
| 		newChain    types.Blocks | 		newChain    types.Blocks | ||||||
| 		commonBlock *types.Block | 		commonBlock *types.Block | ||||||
| @ -675,10 +681,17 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks { | |||||||
| 	if oldBlock.NumberU64() > newBlock.NumberU64() { | 	if oldBlock.NumberU64() > newBlock.NumberU64() { | ||||||
| 		// reduce old chain
 | 		// reduce old chain
 | ||||||
| 		for oldBlock = oldBlock; oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { | 		for oldBlock = oldBlock; oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { | ||||||
|  | 			if oldBlock == nil { | ||||||
|  | 				return nil, fmt.Errorf("Invalid old chain") | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		// reduce new chain and append new chain blocks for inserting later on
 | 		// reduce new chain and append new chain blocks for inserting later on
 | ||||||
| 		for newBlock = newBlock; newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash()) { | 		for newBlock = newBlock; newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash()) { | ||||||
|  | 			if newBlock == nil { | ||||||
|  | 				return nil, fmt.Errorf("Invalid new chain") | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
| 			newChain = append(newChain, newBlock) | 			newChain = append(newChain, newBlock) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @ -692,6 +705,12 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks { | |||||||
| 		newChain = append(newChain, newBlock) | 		newChain = append(newChain, newBlock) | ||||||
| 
 | 
 | ||||||
| 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) | 		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) | ||||||
|  | 		if oldBlock == nil { | ||||||
|  | 			return nil, fmt.Errorf("Invalid old chain") | ||||||
|  | 		} | ||||||
|  | 		if newBlock == nil { | ||||||
|  | 			return nil, fmt.Errorf("Invalid new chain") | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if glog.V(logger.Info) { | 	if glog.V(logger.Info) { | ||||||
| @ -699,17 +718,22 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks { | |||||||
| 		glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) | 		glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return newChain | 	return newChain, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // merge merges two different chain to the new canonical chain
 | // merge merges two different chain to the new canonical chain
 | ||||||
| func (self *ChainManager) merge(oldBlock, newBlock *types.Block) { | func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error { | ||||||
| 	newChain := self.diff(oldBlock, newBlock) | 	newChain, err := self.diff(oldBlock, newBlock) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("chain reorg failed: %v", err) | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
 | 	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
 | ||||||
| 	for _, block := range newChain { | 	for _, block := range newChain { | ||||||
| 		self.insert(block) | 		self.insert(block) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *ChainManager) update() { | func (self *ChainManager) update() { | ||||||
| @ -808,3 +832,11 @@ func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) { | ||||||
|  | 	if !pow.Verify(block) { | ||||||
|  | 		done <- ValidationError("Block(#%v) nonce is invalid (= %x)", block.Number(), block.Nonce) | ||||||
|  | 	} else { | ||||||
|  | 		done <- nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user