core: only fire one chain head per batch (#15123)

* core: only fire one chain head per batch

* miner: announce chan events synchronously
This commit is contained in:
Péter Szilágyi 2017-09-11 13:13:05 +03:00 committed by GitHub
parent 5596b664c4
commit 10b3f97c9d
3 changed files with 63 additions and 67 deletions

View File

@ -854,9 +854,22 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
return status, nil return status, nil
} }
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. If an error is returned // InsertChain attempts to insert the given batch of blocks in to the canonical
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). // chain or, otherwise, create a fork. If an error is returned it will return
// the index number of the failing block as well an error describing what went
// wrong.
//
// After insertion is done, all accumulated events will be fired.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs)
return n, err
}
// insertChain will execute the actual chain insertion and event aggregation. The
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
// Do a sanity check that the provided chain is actually ordered and linked // Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ { for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
@ -864,7 +877,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
} }
} }
@ -881,6 +894,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
var ( var (
stats = insertStats{startTime: mclock.Now()} stats = insertStats{startTime: mclock.Now()}
events = make([]interface{}, 0, len(chain)) events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log coalescedLogs []*types.Log
) )
// Start the parallel header verifier // Start the parallel header verifier
@ -904,7 +918,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// If the header is a banned one, straight out abort // If the header is a banned one, straight out abort
if BadHashes[block.Hash()] { if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash) bc.reportBlock(block, nil, ErrBlacklistedHash)
return i, ErrBlacklistedHash return i, events, coalescedLogs, ErrBlacklistedHash
} }
// Wait for the block's verification to complete // Wait for the block's verification to complete
bstart := time.Now() bstart := time.Now()
@ -925,7 +939,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// if given. // if given.
max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
if block.Time().Cmp(max) > 0 { if block.Time().Cmp(max) > 0 {
return i, fmt.Errorf("future block: %v > %v", block.Time(), max) return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
} }
bc.futureBlocks.Add(block.Hash(), block) bc.futureBlocks.Add(block.Hash(), block)
stats.queued++ stats.queued++
@ -939,7 +953,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
} }
bc.reportBlock(block, nil, err) bc.reportBlock(block, nil, err)
return i, err return i, events, coalescedLogs, err
} }
// Create a new statedb using the parent block and report an // Create a new statedb using the parent block and report an
// error if it fails. // error if it fails.
@ -951,40 +965,35 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
} }
state, err := state.New(parent.Root(), bc.stateCache) state, err := state.New(parent.Root(), bc.stateCache)
if err != nil { if err != nil {
return i, err return i, events, coalescedLogs, err
} }
// Process block using the parent state as reference point. // Process block using the parent state as reference point.
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return i, err return i, events, coalescedLogs, err
} }
// Validate the state using the default validator // Validate the state using the default validator
err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return i, err return i, events, coalescedLogs, err
} }
// Write the block to the chain and get the status. // Write the block to the chain and get the status.
status, err := bc.WriteBlockAndState(block, receipts, state) status, err := bc.WriteBlockAndState(block, receipts, state)
if err != nil { if err != nil {
return i, err return i, events, coalescedLogs, err
} }
switch status { switch status {
case CanonStatTy: case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
// coalesce logs for later processing
coalescedLogs = append(coalescedLogs, logs...) coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart) blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs}) events = append(events, ChainEvent{block, block.Hash(), logs})
// We need some control over the mining operation. Acquiring locks and waiting lastCanon = block
// for the miner to create new block takes too long and in most cases isn't
// even necessary.
if bc.LastBlockHash() == block.Hash() {
events = append(events, ChainHeadEvent{block})
}
case SideStatTy: case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
@ -996,9 +1005,11 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
stats.usedGas += usedGas.Uint64() stats.usedGas += usedGas.Uint64()
stats.report(chain, i) stats.report(chain, i)
} }
go bc.PostChainEvents(events, coalescedLogs) // Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() {
return 0, nil events = append(events, ChainHeadEvent{lastCanon})
}
return 0, events, coalescedLogs, nil
} }
// insertStats tracks and reports on block insertion. // insertStats tracks and reports on block insertion.

View File

@ -935,7 +935,7 @@ func TestReorgSideEvent(t *testing.T) {
} }
gen.AddTx(tx) gen.AddTx(tx)
}) })
chainSideCh := make(chan ChainSideEvent) chainSideCh := make(chan ChainSideEvent, 64)
blockchain.SubscribeChainSideEvent(chainSideCh) blockchain.SubscribeChainSideEvent(chainSideCh)
if _, err := blockchain.InsertChain(replacementBlocks); err != nil { if _, err := blockchain.InsertChain(replacementBlocks); err != nil {
t.Fatalf("failed to insert chain: %v", err) t.Fatalf("failed to insert chain: %v", err)

View File

@ -125,8 +125,6 @@ type worker struct {
// atomic status counters // atomic status counters
mining int32 mining int32
atWork int32 atWork int32
fullValidation bool
} }
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
@ -146,7 +144,6 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
coinbase: coinbase, coinbase: coinbase,
agents: make(map[Agent]struct{}), agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
fullValidation: false,
} }
// Subscribe TxPreEvent for tx pool // Subscribe TxPreEvent for tx pool
worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
@ -297,50 +294,38 @@ func (self *worker) wait() {
block := result.Block block := result.Block
work := result.Work work := result.Work
if self.fullValidation { // Update the block hash in all logs since it is now available and not when the
if _, err := self.chain.InsertChain(types.Blocks{block}); err != nil { // receipt/log of individual transactions were created.
log.Error("Mined invalid block", "err", err) for _, r := range work.receipts {
continue for _, l := range r.Logs {
l.BlockHash = block.Hash()
} }
go self.mux.Post(core.NewMinedBlockEvent{Block: block})
} else {
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, r := range work.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
}
for _, log := range work.state.Logs() {
log.BlockHash = block.Hash()
}
stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
// check if canon block and write transactions
if stat == core.CanonStatTy {
// implicit by posting ChainHeadEvent
mustCommitNewWork = false
}
// broadcast before waiting for validation
go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
self.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
coalescedLogs []*types.Log
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
events = append(events, core.ChainHeadEvent{Block: block})
coalescedLogs = logs
}
// post blockchain events
self.chain.PostChainEvents(events, coalescedLogs)
}(block, work.state.Logs(), work.receipts)
} }
for _, log := range work.state.Logs() {
log.BlockHash = block.Hash()
}
stat, err := self.chain.WriteBlockAndState(block, work.receipts, work.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
// check if canon block and write transactions
if stat == core.CanonStatTy {
// implicit by posting ChainHeadEvent
mustCommitNewWork = false
}
// Broadcast the block and announce chain insertion event
self.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = work.state.Logs()
)
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
events = append(events, core.ChainHeadEvent{Block: block})
}
self.chain.PostChainEvents(events, logs)
// Insert the block into the set of pending ones to wait for confirmations // Insert the block into the set of pending ones to wait for confirmations
self.unconfirmed.Insert(block.NumberU64(), block.Hash()) self.unconfirmed.Insert(block.NumberU64(), block.Hash())