From bb9631c399392577b1d69a1e8f88a2ccbd05e4e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 Mar 2019 12:41:50 +0200 Subject: [PATCH] core: prefetch next block state concurrently --- core/blockchain.go | 98 +++++++++++++++++++++------------------ core/blockchain_insert.go | 43 ++++++++++++++--- core/blockchain_test.go | 2 +- core/state_prefetcher.go | 85 +++++++++++++++++++++++++++++++++ core/types.go | 17 ++++--- core/vm/interpreter.go | 25 ++++------ 6 files changed, 194 insertions(+), 76 deletions(-) create mode 100644 core/state_prefetcher.go diff --git a/core/blockchain.go b/core/blockchain.go index 745f67d8c..a547f6bc3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -62,6 +62,9 @@ var ( blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) + blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) + blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + ErrNoGenesis = errors.New("Genesis not found in chain") ) @@ -126,7 +129,6 @@ type BlockChain struct { genesisBlock *types.Block chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // Current head of the block chain @@ -145,10 +147,11 @@ type BlockChain struct { procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down - engine consensus.Engine - processor Processor // block processor interface - validator Validator // block and state validator interface - vmConfig vm.Config + engine consensus.Engine + validator Validator // Block and state validator interface + prefetcher Prefetcher // Block state prefetcher interface + processor Processor // Block transaction processor interface + vmConfig vm.Config badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. @@ -189,8 +192,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par vmConfig: vmConfig, badBlocks: badBlocks, } - bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) - bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) + bc.validator = NewBlockValidator(chainConfig, bc, engine) + bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) + bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) @@ -381,31 +385,13 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block { return bc.currentFastBlock.Load().(*types.Block) } -// SetProcessor sets the processor required for making state modifications. -func (bc *BlockChain) SetProcessor(processor Processor) { - bc.procmu.Lock() - defer bc.procmu.Unlock() - bc.processor = processor -} - -// SetValidator sets the validator which is used to validate incoming blocks. -func (bc *BlockChain) SetValidator(validator Validator) { - bc.procmu.Lock() - defer bc.procmu.Unlock() - bc.validator = validator -} - // Validator returns the current validator. func (bc *BlockChain) Validator() Validator { - bc.procmu.RLock() - defer bc.procmu.RUnlock() return bc.validator } // Processor returns the current processor. func (bc *BlockChain) Processor() Processor { - bc.procmu.RLock() - defer bc.procmu.RUnlock() return bc.processor } @@ -1147,7 +1133,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // is imported, but then new canon-head is added before the actual sidechain // completes, then the historic state could be pruned again func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { - // If the chain is terminating, don't even bother starting u + // If the chain is terminating, don't even bother starting up if atomic.LoadInt32(&bc.procInterrupt) == 1 { return 0, nil, nil, nil } @@ -1175,7 +1161,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] defer close(abort) // Peek the error for the first block to decide the directing import logic - it := newInsertIterator(chain, results, bc.Validator()) + it := newInsertIterator(chain, results, bc.validator) block, err := it.next() @@ -1238,54 +1224,74 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - state, err := state.New(parent.Root, bc.stateCache) + statedb, err := state.New(parent.Root, bc.stateCache) if err != nil { return it.index, events, coalescedLogs, err } - // Process block using the parent state as reference point. + // If we have a followup block, run that against the current state to pre-cache + // transactions and probabilistically some of the account/storage trie nodes. + var followupInterrupt uint32 + + if followup, err := it.peek(); followup != nil && err == nil { + go func(start time.Time) { + throwaway, _ := state.New(parent.Root, bc.stateCache) + bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + + blockPrefetchExecuteTimer.Update(time.Since(start)) + if atomic.LoadUint32(&followupInterrupt) == 1 { + blockPrefetchInterruptMeter.Mark(1) + } + }(time.Now()) + } + // Process block using the parent state as reference point substart := time.Now() - receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) + receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } // Update the metrics touched during block processing - accountReadTimer.Update(state.AccountReads) // Account reads are complete, we can mark them - storageReadTimer.Update(state.StorageReads) // Storage reads are complete, we can mark them - accountUpdateTimer.Update(state.AccountUpdates) // Account updates are complete, we can mark them - storageUpdateTimer.Update(state.StorageUpdates) // Storage updates are complete, we can mark them + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them - triehash := state.AccountHashes + state.StorageHashes // Save to not double count in validation - trieproc := state.AccountReads + state.AccountUpdates - trieproc += state.StorageReads + state.StorageUpdates + triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation + trieproc := statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.StorageReads + statedb.StorageUpdates blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) // Validate the state using the default validator substart = time.Now() - if err := bc.Validator().ValidateState(block, state, receipts, usedGas); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } proctime := time.Since(start) // Update the metrics touched during block validation - accountHashTimer.Update(state.AccountHashes) // Account hashes are complete, we can mark them - storageHashTimer.Update(state.StorageHashes) // Storage hashes are complete, we can mark them + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them + storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them - blockValidationTimer.Update(time.Since(substart) - (state.AccountHashes + state.StorageHashes - triehash)) + blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)) // Write the block to the chain and get the status. substart = time.Now() - status, err := bc.writeBlockWithState(block, receipts, state) + status, err := bc.writeBlockWithState(block, receipts, statedb) if err != nil { + atomic.StoreUint32(&followupInterrupt, 1) return it.index, events, coalescedLogs, err } - // Update the metrics touched during block commit - accountCommitTimer.Update(state.AccountCommits) // Account commits are complete, we can mark them - storageCommitTimer.Update(state.StorageCommits) // Storage commits are complete, we can mark them + atomic.StoreUint32(&followupInterrupt, 1) - blockWriteTimer.Update(time.Since(substart) - state.AccountCommits - state.StorageCommits) + // Update the metrics touched during block commit + accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them + + blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits) blockInsertTimer.UpdateSince(start) switch status { diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index e2a385164..e4d758d4c 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -80,10 +80,13 @@ func (st *insertStats) report(chain []*types.Block, index int, dirty common.Stor // insertIterator is a helper to assist during chain import. type insertIterator struct { - chain types.Blocks - results <-chan error - index int - validator Validator + chain types.Blocks // Chain of blocks being iterated over + + results <-chan error // Verification result sink from the consensus engine + errors []error // Header verification errors for the blocks + + index int // Current offset of the iterator + validator Validator // Validator to run if verification succeeds } // newInsertIterator creates a new iterator based on the given blocks, which are @@ -92,6 +95,7 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid return &insertIterator{ chain: chain, results: results, + errors: make([]error, 0, len(chain)), index: -1, validator: validator, } @@ -100,17 +104,44 @@ func newInsertIterator(chain types.Blocks, results <-chan error, validator Valid // next returns the next block in the iterator, along with any potential validation // error for that block. When the end is reached, it will return (nil, nil). func (it *insertIterator) next() (*types.Block, error) { + // If we reached the end of the chain, abort if it.index+1 >= len(it.chain) { it.index = len(it.chain) return nil, nil } + // Advance the iterator and wait for verification result if not yet done it.index++ - if err := <-it.results; err != nil { - return it.chain[it.index], err + if len(it.errors) <= it.index { + it.errors = append(it.errors, <-it.results) } + if it.errors[it.index] != nil { + return it.chain[it.index], it.errors[it.index] + } + // Block header valid, run body validation and return return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) } +// peek returns the next block in the iterator, along with any potential validation +// error for that block, but does **not** advance the iterator. +// +// Both header and body validation errors (nil too) is cached into the iterator +// to avoid duplicating work on the following next() call. +func (it *insertIterator) peek() (*types.Block, error) { + // If we reached the end of the chain, abort + if it.index+1 >= len(it.chain) { + return nil, nil + } + // Wait for verification result if not yet done + if len(it.errors) <= it.index+1 { + it.errors = append(it.errors, <-it.results) + } + if it.errors[it.index+1] != nil { + return it.chain[it.index+1], it.errors[it.index+1] + } + // Block header valid, ignore body validation since we don't have a parent anyway + return it.chain[it.index+1], nil +} + // previous returns the previous header that was being processed, or nil. func (it *insertIterator) previous() *types.Header { if it.index < 1 { diff --git a/core/blockchain_test.go b/core/blockchain_test.go index d1681ce3b..c8cae969c 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -144,7 +144,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { if err != nil { return err } - receipts, _, usedGas, err := blockchain.Processor().Process(block, statedb, vm.Config{}) + receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) if err != nil { blockchain.reportBlock(block, receipts, err) return err diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go new file mode 100644 index 000000000..cb85a05b5 --- /dev/null +++ b/core/state_prefetcher.go @@ -0,0 +1,85 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/params" +) + +// statePrefetcher is a basic Prefetcher, which blindly executes a block on top +// of an arbitrary state with the goal of prefetching potentially useful state +// data from disk before the main block processor start executing. +type statePrefetcher struct { + config *params.ChainConfig // Chain configuration options + bc *BlockChain // Canonical block chain + engine consensus.Engine // Consensus engine used for block rewards +} + +// newStatePrefetcher initialises a new statePrefetcher. +func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher { + return &statePrefetcher{ + config: config, + bc: bc, + engine: engine, + } +} + +// Prefetch processes the state changes according to the Ethereum rules by running +// the transaction messages using the statedb, but any changes are discarded. The +// only goal is to pre-cache transaction signatures and state trie nodes. +func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { + var ( + header = block.Header() + gaspool = new(GasPool).AddGas(block.GasLimit()) + ) + // Iterate over and process the individual transactions + for i, tx := range block.Transactions() { + // If block precaching was interrupted, abort + if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + return + } + // Block precaching permitted to continue, execute the transaction + statedb.Prepare(tx.Hash(), block.Hash(), i) + if err := precacheTransaction(p.config, p.bc, nil, gaspool, statedb, header, tx, cfg); err != nil { + return // Ugh, something went horribly wrong, bail out + } + } +} + +// precacheTransaction attempts to apply a transaction to the given state database +// and uses the input parameters for its environment. The goal is not to execute +// the transaction successfully, rather to warm up touched data slots. +func precacheTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gaspool *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, cfg vm.Config) error { + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessage(types.MakeSigner(config, header.Number)) + if err != nil { + return err + } + // Create the EVM and execute the transaction + context := NewEVMContext(msg, header, bc, author) + vm := vm.NewEVM(context, statedb, config, cfg) + + _, _, _, err = ApplyMessage(vm, msg, gaspool) + return err +} diff --git a/core/types.go b/core/types.go index 5c963e665..4c5b74a49 100644 --- a/core/types.go +++ b/core/types.go @@ -25,7 +25,6 @@ import ( // Validator is an interface which defines the standard for block validation. It // is only responsible for validating block contents, as the header validation is // done by the specific consensus engines. -// type Validator interface { // ValidateBody validates the given block's content. ValidateBody(block *types.Block) error @@ -35,12 +34,18 @@ type Validator interface { ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error } +// Prefetcher is an interface for pre-caching transaction signatures and state. +type Prefetcher interface { + // Prefetch processes the state changes according to the Ethereum rules by running + // the transaction messages using the statedb, but any changes are discarded. The + // only goal is to pre-cache transaction signatures and state trie nodes. + Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) +} + // Processor is an interface for processing blocks using a given initial state. -// -// Process takes the block to be processed and the statedb upon which the -// initial state is based. It should return the receipts generated, amount -// of gas used in the process and return an error if any of the internal rules -// failed. type Processor interface { + // Process processes the state changes according to the Ethereum rules by running + // the transaction messages using the statedb and applying any rewards to both + // the processor (coinbase) and any included uncles. Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 417665370..989f85f5d 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -28,24 +28,15 @@ import ( // Config are the configuration options for the Interpreter type Config struct { - // Debug enabled debugging Interpreter options - Debug bool - // Tracer is the op code logger - Tracer Tracer - // NoRecursion disabled Interpreter call, callcode, - // delegate call and create. - NoRecursion bool - // Enable recording of SHA3/keccak preimages - EnablePreimageRecording bool - // JumpTable contains the EVM instruction table. This - // may be left uninitialised and will be set to the default - // table. - JumpTable [256]operation + Debug bool // Enables debugging + Tracer Tracer // Opcode logger + NoRecursion bool // Disables call, callcode, delegate call and create + EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages - // Type of the EWASM interpreter - EWASMInterpreter string - // Type of the EVM interpreter - EVMInterpreter string + JumpTable [256]operation // EVM instruction table, automatically populated if unset + + EWASMInterpreter string // External EWASM interpreter options + EVMInterpreter string // External EVM interpreter options } // Interpreter is used to run Ethereum based contracts and will utilise the