From 79686c98b05194ef185a439435b5a0275a5b795a Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Fri, 8 Feb 2019 15:15:36 -0600 Subject: [PATCH] Make sure that statediff has been processed before pruning --- cmd/geth/config.go | 4 ++ core/blockchain.go | 83 +++++++++++++++++------ core/blockchain_test.go | 81 ++++++++++++++++++++++ eth/backend.go | 9 +-- eth/config.go | 4 ++ statediff/service/service.go | 3 + statediff/testhelpers/mocks/blockchain.go | 2 + 7 files changed, 162 insertions(+), 24 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 4e6b3a079..54575ecb2 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -155,6 +155,10 @@ func makeFullNode(ctx *cli.Context) *node.Node { if ctx.GlobalIsSet(utils.ConstantinopleOverrideFlag.Name) { cfg.Eth.ConstantinopleOverride = new(big.Int).SetUint64(ctx.GlobalUint64(utils.ConstantinopleOverrideFlag.Name)) } + if ctx.GlobalBool(utils.StateDiffFlag.Name) { + cfg.Eth.StateDiff = true + } + utils.RegisterEthService(stack, &cfg.Eth) if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) { diff --git a/core/blockchain.go b/core/blockchain.go index 49aedf669..30153b5c3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -71,10 +71,11 @@ const ( // CacheConfig contains the configuration values for the trie caching/pruning // that's resident in a blockchain. type CacheConfig struct { - Disabled bool // Whether to disable trie write caching (archive node) - TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory - TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk - TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + Disabled bool // Whether to disable trie write caching (archive node) + TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory + TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + ProcessStateDiffs bool } // BlockChain represents the canonical chain given a database with a genesis @@ -136,6 +137,8 @@ type BlockChain struct { badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + + stateDiffsProcessed map[common.Hash]int } // NewBlockChain returns a fully initialised block chain using information @@ -155,24 +158,26 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par blockCache, _ := lru.New(blockCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) badBlocks, _ := lru.New(badBlockLimit) - + stateDiffsProcessed := make(map[common.Hash]int) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), - quit: make(chan struct{}), - shouldPreserve: shouldPreserve, - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(nil), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + quit: make(chan struct{}), + shouldPreserve: shouldPreserve, + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, + stateDiffsProcessed: stateDiffsProcessed, } + bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) @@ -922,6 +927,20 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e return nil } +func (bc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) { + count, ok := bc.stateDiffsProcessed[hash] + if count > 1 { + log.Error("count is too high", "count", count, "hash", hash.Hex()) + } + + if ok { + count++ + bc.stateDiffsProcessed[hash] = count + } else { + bc.stateDiffsProcessed[hash] = 1 + } +} + // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { bc.wg.Add(1) @@ -994,6 +1013,30 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. bc.triegc.Push(root, number) break } + + if bc.cacheConfig.ProcessStateDiffs { + count, ok := bc.stateDiffsProcessed[root.(common.Hash)] + //if we haven't processed the statediff for a given state root and it's child, don't dereference it yet + if !ok { + log.Info("Current root NOT found root in stateDiffsProcessed", "root", root.(common.Hash).Hex()) + bc.triegc.Push(root, number) + break + } + if count < 2 { + log.Info("Current root has not yet been processed for it's child", "root", root.(common.Hash).Hex()) + bc.triegc.Push(root, number) + break + } else { + log.Warn("Current root found in stateDiffsProcessed collection with a count of 2, okay to dereference", + "root", root.(common.Hash).Hex(), + "blockNumber", uint64(-number), + "size of stateDiffsProcessed", len(bc.stateDiffsProcessed)) + + delete(bc.stateDiffsProcessed, root.(common.Hash)) + } + } + + log.Info("DEREFERENCING", "root", root.(common.Hash).Hex()) triedb.Dereference(root.(common.Hash)) } } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 5ab29e205..5adb072ac 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -1483,3 +1483,84 @@ func BenchmarkBlockChain_1x1000Executions(b *testing.B) { benchmarkLargeNumberOfValueToNonexisting(b, numTxs, numBlocks, recipientFn, dataFn) } + +func TestProcessingStateDiffs(t *testing.T) { + defaultTrieCleanCache := 256 + defaultTrieDirtyCache := 256 + defaultTrieTimeout := 60 * time.Minute + cacheConfig := &CacheConfig{ + Disabled: false, + TrieCleanLimit: defaultTrieCleanCache, + TrieDirtyLimit: defaultTrieDirtyCache, + TrieTimeLimit: defaultTrieTimeout, + ProcessStateDiffs: true, + } + db := ethdb.NewMemDatabase() + genesis := new(Genesis).MustCommit(db) + numberOfBlocks := triesInMemory + engine := ethash.NewFaker() + blockchain, _ := NewBlockChain(db, cacheConfig, params.AllEthashProtocolChanges, engine, vm.Config{}, nil) + blocks := makeBlockChain(genesis, numberOfBlocks + 1, engine, db, canonicalSeed) + _, err := blockchain.InsertChain(blocks) + if err != nil { + t.Fatalf("failed to create pristine chain: %v", err) + } + defer blockchain.Stop() + + //when adding a root hash to the collection, it will increment the count + firstStateRoot := blocks[0].Root() + blockchain.AddToStateDiffProcessedCollection(firstStateRoot) + value, ok := blockchain.stateDiffsProcessed[firstStateRoot] + if !ok { + t.Error("state root not found in collection") + } + if value != 1 { + t.Error("state root count not correct", "want", 1, "got", value) + } + + blockchain.AddToStateDiffProcessedCollection(firstStateRoot) + value, ok = blockchain.stateDiffsProcessed[firstStateRoot] + if !ok { + t.Error("state root not found in collection") + } + if value != 2 { + t.Error("state root count not correct", "want", 2, "got", value) + } + + moreBlocks := makeBlockChain(blocks[len(blocks) - 1], 1, engine, db, canonicalSeed) + _, err = blockchain.InsertChain(moreBlocks) + + //a root hash can be dereferenced when it's state diff and it's child's state diff have been processed + //(i.e. it has a count of 2 in stateDiffsProcessed) + nodes := blockchain.stateCache.TrieDB().Nodes() + if containsRootHash(nodes, firstStateRoot) { + t.Errorf("stateRoot %s in nodes, want: %t, got: %t", firstStateRoot.Hex(), false, true) + } + + //a root hash should still be in the in-mem db if it's child's state diff hasn't yet been processed + //(i.e. it has a count of 1 stateDiffsProcessed) + secondStateRoot := blocks[1].Root() + blockchain.AddToStateDiffProcessedCollection(secondStateRoot) + if !containsRootHash(nodes, secondStateRoot) { + t.Errorf("stateRoot %s in nodes, want: %t, got: %t", secondStateRoot.Hex(), true, false) + } + + //the stateDiffsProcessed collection is cleaned up once a hash has been dereferenced + _, ok = blockchain.stateDiffsProcessed[firstStateRoot] + if ok { + t.Errorf("stateRoot %s in stateDiffsProcessed collection, want: %t, got: %t", + firstStateRoot.Hex(), + false, + ok, + ) + } +} + +func containsRootHash(collection []common.Hash, hash common.Hash) bool{ + for _, n := range collection { + if n == hash { + return true + } + } + return false +} diff --git a/eth/backend.go b/eth/backend.go index a0b274a38..7bd3332d5 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -157,10 +157,11 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { EVMInterpreter: config.EVMInterpreter, } cacheConfig = &core.CacheConfig{ - Disabled: config.NoPruning, - TrieCleanLimit: config.TrieCleanCache, - TrieDirtyLimit: config.TrieDirtyCache, - TrieTimeLimit: config.TrieTimeout, + Disabled: config.NoPruning, + TrieCleanLimit: config.TrieCleanCache, + TrieDirtyLimit: config.TrieDirtyCache, + TrieTimeLimit: config.TrieTimeout, + ProcessStateDiffs: config.StateDiff, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig, eth.shouldPreserve) diff --git a/eth/config.go b/eth/config.go index 7c041d1af..4d6e0fa32 100644 --- a/eth/config.go +++ b/eth/config.go @@ -59,6 +59,8 @@ var DefaultConfig = Config{ Blocks: 20, Percentile: 60, }, + + StateDiff: false, } func init() { @@ -135,6 +137,8 @@ type Config struct { // Constantinople block override (TODO: remove after the fork) ConstantinopleOverride *big.Int + + StateDiff bool } type configMarshaling struct { diff --git a/statediff/service/service.go b/statediff/service/service.go index 3e9fc61d1..19ea0c644 100644 --- a/statediff/service/service.go +++ b/statediff/service/service.go @@ -19,6 +19,7 @@ import ( type BlockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block + AddToStateDiffProcessedCollection(hash common.Hash) } type StateDiffService struct { @@ -94,6 +95,8 @@ HandleBlockChLoop: log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err) } else { log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation) + sds.BlockChain.AddToStateDiffProcessedCollection(parentBlock.Root()) + sds.BlockChain.AddToStateDiffProcessedCollection(currentBlock.Root()) } case <-quitCh: log.Debug("Quitting the statediff block channel") diff --git a/statediff/testhelpers/mocks/blockchain.go b/statediff/testhelpers/mocks/blockchain.go index f2d77ea34..41ceb0cad 100644 --- a/statediff/testhelpers/mocks/blockchain.go +++ b/statediff/testhelpers/mocks/blockchain.go @@ -16,6 +16,8 @@ type BlockChain struct { ChainEvents []core.ChainEvent } +func (mc *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {} + func (mc *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) { mc.parentBlocksToReturn = blocks }