if statediffing is on, lock tries in triedb until the statediffing service signals they are done using them

This commit is contained in:
Ian Norden 2020-08-10 10:18:33 -05:00
parent e004e9b94b
commit c94e07e2c0
6 changed files with 68 additions and 14 deletions

View File

@ -131,6 +131,9 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
}
utils.SetShhConfig(ctx, stack, &cfg.Shh)
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
cfg.Eth.Diffing = true
}
return stack, cfg
}

View File

@ -43,7 +43,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/golang-lru"
)
var (
@ -115,6 +115,7 @@ type CacheConfig struct {
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieDirtyDisabled bool // Whether to disable trie write caching and GC altogether (archive node)
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
StateDiffing bool // Whether or not the statediffing service is running
}
// BlockChain represents the canonical chain given a database with a genesis
@ -177,6 +178,10 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
// Locked roots and their mutex
trieLock sync.Mutex
lockedRoots map[common.Hash]bool
}
// NewBlockChain returns a fully initialised block chain using information
@ -214,6 +219,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
lockedRoots: make(map[common.Hash]bool),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
@ -857,7 +863,10 @@ func (bc *BlockChain) Stop() {
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
pruneRoot := bc.triegc.PopItem().(common.Hash)
if !bc.TrieLocked(pruneRoot) {
triedb.Dereference(pruneRoot)
}
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
@ -1342,6 +1351,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
// If we are statediffing, lock the trie until the statediffing service is done using it
if bc.cacheConfig.StateDiffing {
bc.LockTrie(root)
}
if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
@ -1380,8 +1394,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}
log.Debug("Dereferencing", "root", root.(common.Hash).Hex())
triedb.Dereference(root.(common.Hash))
pruneRoot := root.(common.Hash)
if !bc.TrieLocked(pruneRoot) {
log.Debug("Dereferencing", "root", root.(common.Hash).Hex())
triedb.Dereference(pruneRoot)
}
}
}
}
@ -2254,3 +2271,30 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}
// TrieLocked returns whether the trie associated with the provided root is locked for use
func (bc *BlockChain) TrieLocked(root common.Hash) bool {
bc.trieLock.Lock()
locked, ok := bc.lockedRoots[root]
bc.trieLock.Unlock()
if !ok {
return false
}
return locked
}
// LockTrie prevents dereferencing of the provided root
func (bc *BlockChain) LockTrie(root common.Hash) {
bc.trieLock.Lock()
bc.lockedRoots[root] = true
bc.trieLock.Unlock()
return
}
// UnlockTrie allows dereferencing of the provided root- provided it was previously locked
func (bc *BlockChain) UnlockTrie(root common.Hash) {
bc.trieLock.Lock()
bc.lockedRoots[root] = false
bc.trieLock.Unlock()
return
}

View File

@ -184,6 +184,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
StateDiffing: config.Diffing,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve)

View File

@ -164,4 +164,8 @@ type Config struct {
// MuirGlacier block override (TODO: remove after the fork)
OverrideMuirGlacier *big.Int `toml:",omitempty"`
// Signify whether or not we are producing statediffs
// If we are, do not dereference state roots until the statediffing service is done with them
Diffing bool
}

View File

@ -128,7 +128,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, error) {
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return stateNodes, nil
return stateNodes, it.Error()
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
@ -301,7 +301,7 @@ func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddres
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return diffAcountsAtB, diffPathsAtB, nil
return diffAcountsAtB, diffPathsAtB, it.Error()
}
// createdAndUpdatedStateWithIntermediateNodes returns
@ -368,7 +368,7 @@ func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIt
// add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, nil
return createdOrUpdatedIntermediateNodes, diffAcountsAtB, diffPathsAtB, it.Error()
}
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
@ -433,7 +433,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
return nil, nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return emptiedPaths, diffAccountAtA, nil
return emptiedPaths, diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
@ -559,7 +559,7 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return storageDiffs, nil
return storageDiffs, it.Error()
}
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
@ -641,7 +641,7 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
}
diffPathsAtB[common.Bytes2Hex(nodePath)] = true
}
return createdOrUpdatedStorage, diffPathsAtB, nil
return createdOrUpdatedStorage, diffPathsAtB, it.Error()
}
func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedKeys []common.Hash, intermediateNodes bool) ([]StorageNode, error) {
@ -700,7 +700,7 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB
return nil, fmt.Errorf("unexpected node type %s", ty)
}
}
return deletedStorage, nil
return deletedStorage, it.Error()
}
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch

View File

@ -44,6 +44,7 @@ type blockChain interface {
GetBlockByNumber(number uint64) *types.Block
GetReceiptsByHash(hash common.Hash) types.Receipts
GetTdByHash(hash common.Hash) *big.Int
UnlockTrie(root common.Hash)
}
// IService is the state-diffing service interface
@ -53,7 +54,7 @@ type IService interface {
// Main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
Subscribe(id rpc.ID, sub chan<- Payload, quitChanogr chan<- bool, params Params)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error
// Method to get state diff object at specific block
@ -165,8 +166,7 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
// create payload for this subscription type
payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
if err != nil {
log.Error(fmt.Sprintf("statediff processing error for subscriptions with parameters: %+v", params))
sds.closeType(ty)
log.Error(fmt.Sprintf("statediff processing error a blockheight %d for subscriptions with parameters: %+v err: %s", currentBlock.Number().Uint64(), params, err.Error()))
continue
}
for id, sub := range subs {
@ -201,6 +201,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
BlockHash: currentBlock.Hash(),
BlockNumber: currentBlock.Number(),
}, params)
// allow dereferencing of parent, keep current locked as it should be the next parent
sds.BlockChain.UnlockTrie(parentRoot)
if err != nil {
return nil, err
}