diff --git a/cmd/geth/config.go b/cmd/geth/config.go index d49c8f21f..dceda0226 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -131,6 +131,9 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name) } utils.SetShhConfig(ctx, stack, &cfg.Shh) + if ctx.GlobalBool(utils.StateDiffFlag.Name) { + cfg.Eth.Diffing = true + } return stack, cfg } diff --git a/core/blockchain.go b/core/blockchain.go index 47187b097..bd17cf5b7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -43,7 +43,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru" ) var ( @@ -115,6 +115,7 @@ type CacheConfig struct { TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node) TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk + StateDiffing bool // Whether or not the statediffing service is running } // BlockChain represents the canonical chain given a database with a genesis @@ -177,6 +178,10 @@ type BlockChain struct { badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. + + // Locked roots and their mutex + trieLock sync.Mutex + lockedRoots map[common.Hash]bool } // NewBlockChain returns a fully initialised block chain using information @@ -214,6 +219,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par engine: engine, vmConfig: vmConfig, badBlocks: badBlocks, + lockedRoots: make(map[common.Hash]bool), } bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine) @@ -857,7 +863,10 @@ func (bc *BlockChain) Stop() { } } for !bc.triegc.Empty() { - triedb.Dereference(bc.triegc.PopItem().(common.Hash)) + pruneRoot := bc.triegc.PopItem().(common.Hash) + if !bc.TrieLocked(pruneRoot) { + triedb.Dereference(pruneRoot) + } } if size, _ := triedb.Size(); size != 0 { log.Error("Dangling trie nodes after full cleanup") @@ -1342,6 +1351,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive bc.triegc.Push(root, -int64(block.NumberU64())) + // If we are statediffing, lock the trie until the statediffing service is done using it + if bc.cacheConfig.StateDiffing { + bc.LockTrie(root) + } + if current := block.NumberU64(); current > TriesInMemory { // If we exceeded our memory allowance, flush matured singleton nodes to disk var ( @@ -1380,8 +1394,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. bc.triegc.Push(root, number) break } - log.Debug("Dereferencing", "root", root.(common.Hash).Hex()) - triedb.Dereference(root.(common.Hash)) + pruneRoot := root.(common.Hash) + if !bc.TrieLocked(pruneRoot) { + log.Debug("Dereferencing", "root", root.(common.Hash).Hex()) + triedb.Dereference(pruneRoot) + } } } } @@ -2254,3 +2271,30 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// TrieLocked returns whether the trie associated with the provided root is locked for use +func (bc *BlockChain) TrieLocked(root common.Hash) bool { + bc.trieLock.Lock() + locked, ok := bc.lockedRoots[root] + bc.trieLock.Unlock() + if !ok { + return false + } + return locked +} + +// LockTrie prevents dereferencing of the provided root +func (bc *BlockChain) LockTrie(root common.Hash) { + bc.trieLock.Lock() + bc.lockedRoots[root] = true + bc.trieLock.Unlock() + return +} + +// UnlockTrie allows dereferencing of the provided root- provided it was previously locked +func (bc *BlockChain) UnlockTrie(root common.Hash) { + bc.trieLock.Lock() + bc.lockedRoots[root] = false + bc.trieLock.Unlock() + return +} diff --git a/eth/backend.go b/eth/backend.go index bda307d95..6df43e923 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -184,6 +184,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, + StateDiffing: config.Diffing, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve) diff --git a/eth/config.go b/eth/config.go index 2eaf21fbc..7d3e6ce48 100644 --- a/eth/config.go +++ b/eth/config.go @@ -164,4 +164,8 @@ type Config struct { // MuirGlacier block override (TODO: remove after the fork) OverrideMuirGlacier *big.Int `toml:",omitempty"` + + // Signify whether or not we are producing statediffs + // If we are, do not dereference state roots until the statediffing service is done with them + Diffing bool } diff --git a/statediff/builder.go b/statediff/builder.go index 52a0de87a..90d1d4277 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -128,7 +128,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, error) { return nil, fmt.Errorf("unexpected node type %s", ty) } } - return stateNodes, nil + return stateNodes, it.Error() } // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters @@ -301,7 +301,7 @@ func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddres // add both intermediate and leaf node paths to the list of diffPathsAtB diffPathsAtB[common.Bytes2Hex(nodePath)] = true } - return diffAcountsAtB, diffPathsAtB, nil + return diffAcountsAtB, diffPathsAtB, it.Error() } // createdAndUpdatedStateWithIntermediateNodes returns @@ -368,7 +368,7 @@ func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIt // add both intermediate and leaf node paths to the list of diffPathsAtB diffPathsAtB[common.Bytes2Hex(nodePath)] = true } - return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, nil + return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, it.Error() } // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B @@ -433,7 +433,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m return nil, nil, fmt.Errorf("unexpected node type %s", ty) } } - return emptiedPaths, diffAccountAtA, nil + return emptiedPaths, diffAccountAtA, it.Error() } // buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys @@ -559,7 +559,7 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora return nil, fmt.Errorf("unexpected node type %s", ty) } } - return storageDiffs, nil + return storageDiffs, it.Error() } // buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A @@ -641,7 +641,7 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys } diffPathsAtB[common.Bytes2Hex(nodePath)] = true } - return createdOrUpdatedStorage, diffPathsAtB, nil + return createdOrUpdatedStorage, diffPathsAtB, it.Error() } func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedKeys []common.Hash, intermediateNodes bool) ([]StorageNode, error) { @@ -700,7 +700,7 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB return nil, fmt.Errorf("unexpected node type %s", ty) } } - return deletedStorage, nil + return deletedStorage, it.Error() } // isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch diff --git a/statediff/service.go b/statediff/service.go index 27bd2b105..5f37cfc0c 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -44,6 +44,7 @@ type blockChain interface { GetBlockByNumber(number uint64) *types.Block GetReceiptsByHash(hash common.Hash) types.Receipts GetTdByHash(hash common.Hash) *big.Int + UnlockTrie(root common.Hash) } // IService is the state-diffing service interface @@ -53,7 +54,7 @@ type IService interface { // Main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) + Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error // Method to get state diff object at specific block @@ -165,8 +166,7 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common // create payload for this subscription type payload, err := sds.processStateDiff(currentBlock, parentRoot, params) if err != nil { - log.Error(fmt.Sprintf("statediff processing error for subscriptions with parameters: %+v", params)) - sds.closeType(ty) + log.Error(fmt.Sprintf("statediff processing error a blockheight %d for subscriptions with parameters: %+v err: %s", currentBlock.Number().Uint64(), params, err.Error())) continue } for id, sub := range subs { @@ -201,6 +201,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo BlockHash: currentBlock.Hash(), BlockNumber: currentBlock.Number(), }, params) + // allow dereferencing of parent, keep current locked as it should be the next parent + sds.BlockChain.UnlockTrie(parentRoot) if err != nil { return nil, err }