Make sure that statediff has been processed before pruning

This commit is contained in:
Elizabeth Engelman 2019-02-08 15:15:36 -06:00
parent 498831ec13
commit 79686c98b0
7 changed files with 162 additions and 24 deletions

View File

@ -155,6 +155,10 @@ func makeFullNode(ctx *cli.Context) *node.Node {
if ctx.GlobalIsSet(utils.ConstantinopleOverrideFlag.Name) { if ctx.GlobalIsSet(utils.ConstantinopleOverrideFlag.Name) {
cfg.Eth.ConstantinopleOverride = new(big.Int).SetUint64(ctx.GlobalUint64(utils.ConstantinopleOverrideFlag.Name)) cfg.Eth.ConstantinopleOverride = new(big.Int).SetUint64(ctx.GlobalUint64(utils.ConstantinopleOverrideFlag.Name))
} }
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
cfg.Eth.StateDiff = true
}
utils.RegisterEthService(stack, &cfg.Eth) utils.RegisterEthService(stack, &cfg.Eth)
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) { if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {

View File

@ -75,6 +75,7 @@ type CacheConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
ProcessStateDiffs bool
} }
// BlockChain represents the canonical chain given a database with a genesis // BlockChain represents the canonical chain given a database with a genesis
@ -136,6 +137,8 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
stateDiffsProcessed map[common.Hash]int
} }
// NewBlockChain returns a fully initialised block chain using information // NewBlockChain returns a fully initialised block chain using information
@ -155,7 +158,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit) blockCache, _ := lru.New(blockCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks) futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit) badBlocks, _ := lru.New(badBlockLimit)
stateDiffsProcessed := make(map[common.Hash]int)
bc := &BlockChain{ bc := &BlockChain{
chainConfig: chainConfig, chainConfig: chainConfig,
cacheConfig: cacheConfig, cacheConfig: cacheConfig,
@ -172,7 +175,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
engine: engine, engine: engine,
vmConfig: vmConfig, vmConfig: vmConfig,
badBlocks: badBlocks, badBlocks: badBlocks,
stateDiffsProcessed: stateDiffsProcessed,
} }
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
@ -922,6 +927,20 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
return nil return nil
} }
func (bc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {
count, ok := bc.stateDiffsProcessed[hash]
if count > 1 {
log.Error("count is too high", "count", count, "hash", hash.Hex())
}
if ok {
count++
bc.stateDiffsProcessed[hash] = count
} else {
bc.stateDiffsProcessed[hash] = 1
}
}
// WriteBlockWithState writes the block and all associated state to the database. // WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1) bc.wg.Add(1)
@ -994,6 +1013,30 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number) bc.triegc.Push(root, number)
break break
} }
if bc.cacheConfig.ProcessStateDiffs {
count, ok := bc.stateDiffsProcessed[root.(common.Hash)]
//if we haven't processed the statediff for a given state root and it's child, don't dereference it yet
if !ok {
log.Info("Current root NOT found root in stateDiffsProcessed", "root", root.(common.Hash).Hex())
bc.triegc.Push(root, number)
break
}
if count < 2 {
log.Info("Current root has not yet been processed for it's child", "root", root.(common.Hash).Hex())
bc.triegc.Push(root, number)
break
} else {
log.Warn("Current root found in stateDiffsProcessed collection with a count of 2, okay to dereference",
"root", root.(common.Hash).Hex(),
"blockNumber", uint64(-number),
"size of stateDiffsProcessed", len(bc.stateDiffsProcessed))
delete(bc.stateDiffsProcessed, root.(common.Hash))
}
}
log.Info("DEREFERENCING", "root", root.(common.Hash).Hex())
triedb.Dereference(root.(common.Hash)) triedb.Dereference(root.(common.Hash))
} }
} }

View File

@ -1483,3 +1483,84 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn)
} }
func TestProcessingStateDiffs(t *testing.T) {
defaultTrieCleanCache := 256
defaultTrieDirtyCache := 256
defaultTrieTimeout := 60 * time.Minute
cacheConfig := &CacheConfig{
Disabled: false,
TrieCleanLimit: defaultTrieCleanCache,
TrieDirtyLimit: defaultTrieDirtyCache,
TrieTimeLimit: defaultTrieTimeout,
ProcessStateDiffs: true,
}
db := ethdb.NewMemDatabase()
genesis := new(Genesis).MustCommit(db)
numberOfBlocks := triesInMemory
engine := ethash.NewFaker()
blockchain, _ := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil)
blocks := makeBlockChain(genesis, numberOfBlocks + 1, engine, db, canonicalSeed)
_, err := blockchain.InsertChain(blocks)
if err != nil {
t.Fatalf("failed to create pristine chain: %v", err)
}
defer blockchain.Stop()
//when adding a root hash to the collection, it will increment the count
firstStateRoot := blocks[0].Root()
blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok := blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 1 {
t.Error("state root count not correct", "want", 1, "got", value)
}
blockchain.AddToStateDiffProcessedCollection(firstStateRoot)
value, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if !ok {
t.Error("state root not found in collection")
}
if value != 2 {
t.Error("state root count not correct", "want", 2, "got", value)
}
moreBlocks := makeBlockChain(blocks[len(blocks) - 1], 1, engine, db, canonicalSeed)
_, err = blockchain.InsertChain(moreBlocks)
//a root hash can be dereferenced when it's state diff and it's child's state diff have been processed
//(i.e. it has a count of 2 in stateDiffsProcessed)
nodes := blockchain.stateCache.TrieDB().Nodes()
if containsRootHash(nodes, firstStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", firstStateRoot.Hex(), false, true)
}
//a root hash should still be in the in-mem db if it's child's state diff hasn't yet been processed
//(i.e. it has a count of 1 stateDiffsProcessed)
secondStateRoot := blocks[1].Root()
blockchain.AddToStateDiffProcessedCollection(secondStateRoot)
if !containsRootHash(nodes, secondStateRoot) {
t.Errorf("stateRoot %s in nodes, want: %t, got: %t", secondStateRoot.Hex(), true, false)
}
//the stateDiffsProcessed collection is cleaned up once a hash has been dereferenced
_, ok = blockchain.stateDiffsProcessed[firstStateRoot]
if ok {
t.Errorf("stateRoot %s in stateDiffsProcessed collection, want: %t, got: %t",
firstStateRoot.Hex(),
false,
ok,
)
}
}
func containsRootHash(collection []common.Hash, hash common.Hash) bool{
for _, n := range collection {
if n == hash {
return true
}
}
return false
}

View File

@ -161,6 +161,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
TrieCleanLimit: config.TrieCleanCache, TrieCleanLimit: config.TrieCleanCache,
TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyLimit: config.TrieDirtyCache,
TrieTimeLimit: config.TrieTimeout, TrieTimeLimit: config.TrieTimeout,
ProcessStateDiffs: config.StateDiff,
} }
) )
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig, eth.shouldPreserve) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig, eth.shouldPreserve)

View File

@ -59,6 +59,8 @@ var DefaultConfig = Config{
Blocks: 20, Blocks: 20,
Percentile: 60, Percentile: 60,
}, },
StateDiff: false,
} }
func init() { func init() {
@ -135,6 +137,8 @@ type Config struct {
// Constantinople block override (TODO: remove after the fork) // Constantinople block override (TODO: remove after the fork)
ConstantinopleOverride *big.Int ConstantinopleOverride *big.Int
StateDiff bool
} }
type configMarshaling struct { type configMarshaling struct {

View File

@ -19,6 +19,7 @@ import (
type BlockChain interface { type BlockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block GetBlockByHash(hash common.Hash) *types.Block
AddToStateDiffProcessedCollection(hash common.Hash)
} }
type StateDiffService struct { type StateDiffService struct {
@ -94,6 +95,8 @@ HandleBlockChLoop:
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err) log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
} else { } else {
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation) log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
sds.BlockChain.AddToStateDiffProcessedCollection(parentBlock.Root())
sds.BlockChain.AddToStateDiffProcessedCollection(currentBlock.Root())
} }
case <-quitCh: case <-quitCh:
log.Debug("Quitting the statediff block channel") log.Debug("Quitting the statediff block channel")

View File

@ -16,6 +16,8 @@ type BlockChain struct {
ChainEvents []core.ChainEvent ChainEvents []core.ChainEvent
} }
func (mc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {}
func (mc *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) { func (mc *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) {
mc.parentBlocksToReturn = blocks mc.parentBlocksToReturn = blocks
} }