Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
3 changed files with 77 additions and 51 deletions
Showing only changes of commit 711afbc7fd - Show all commits

View File

@ -169,12 +169,14 @@ type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in db ethdb.Database // Low level persistent database to store final content in
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping gcproc time.Duration // Accumulates canonical block processing for trie dumping
triedb *trie.Database // The database handler for maintaining trie nodes. lastWrite uint64 // Last block when the state was flushed
stateCache state.Database // State database to reuse between imports (contains state cache) flushInterval int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)
// txLookupLimit is the maximum number of blocks from head whose tx indices // txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved: // are reserved:
@ -258,6 +260,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
cacheConfig: cacheConfig, cacheConfig: cacheConfig,
db: db, db: db,
triedb: triedb, triedb: triedb,
flushInterval: int64(cacheConfig.TrieTimeLimit),
triegc: prque.New(nil), triegc: prque.New(nil),
quit: make(chan struct{}), quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(), chainmu: syncx.NewClosableMutex(),
@ -1248,8 +1251,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil return 0, nil
} }
var lastWrite uint64
// writeBlockWithoutState writes only the block and its metadata to the database, // writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks // but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty. // up to the point where they exceed the canonical total difficulty.
@ -1311,53 +1312,55 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// If we're running an archive node, always flush // If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled { if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false, nil) return bc.triedb.Commit(root, false, nil)
} else { }
// Full but not archive node, do proper garbage collection // Full but not archive node, do proper garbage collection
bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64())) bc.triegc.Push(root, -int64(block.NumberU64()))
if current := block.NumberU64(); current > TriesInMemory { current := block.NumberU64()
// If we exceeded our memory allowance, flush matured singleton nodes to disk // Flush limits are not considered for the first TriesInMemory blocks.
var ( if current <= TriesInMemory {
nodes, imgs = bc.triedb.Size() return nil
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 }
) // If we exceeded our memory allowance, flush matured singleton nodes to disk
if nodes > limit || imgs > 4*1024*1024 { var (
bc.triedb.Cap(limit - ethdb.IdealBatchSize) nodes, imgs = bc.triedb.Size()
} limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
// Find the next state trie we need to commit )
chosen := current - TriesInMemory if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
// If we exceeded out time allowance, flush an entire trie to disk }
if bc.gcproc > bc.cacheConfig.TrieTimeLimit { // Find the next state trie we need to commit
// If the header is missing (canonical chain behind), we're reorging a low chosen := current - TriesInMemory
// diff sidechain. Suspend committing until this operation is completed. flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval))
header := bc.GetHeaderByNumber(chosen) // If we exceeded time allowance, flush an entire trie to disk
if header == nil { if bc.gcproc > flushInterval {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen) // If the header is missing (canonical chain behind), we're reorging a low
} else { // diff sidechain. Suspend committing until this operation is completed.
// If we're exceeding limits but haven't reached a large enough memory gap, header := bc.GetHeaderByNumber(chosen)
// warn the user that the system is becoming unstable. if header == nil {
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) } else {
} // If we're exceeding limits but haven't reached a large enough memory gap,
// Flush an entire trie and restart the counters // warn the user that the system is becoming unstable.
bc.triedb.Commit(header.Root, true, nil) if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval {
lastWrite = chosen log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory)
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
bc.triedb.Dereference(root.(common.Hash))
} }
// Flush an entire trie and restart the counters
bc.triedb.Commit(header.Root, true, nil)
bc.lastWrite = chosen
bc.gcproc = 0
} }
} }
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
bc.triedb.Dereference(root.(common.Hash))
}
return nil return nil
} }
@ -2436,3 +2439,10 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro
bc.validator = v bc.validator = v
bc.processor = p bc.processor = p
} }
// SetTrieFlushInterval configures how often in-memory tries are persisted to disk.
// The interval is in terms of block processing time, not wall clock.
// It is thread-safe and can be called repeatedly without side effects.
func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
atomic.StoreInt64(&bc.flushInterval, int64(interval))
}

View File

@ -590,3 +590,14 @@ func (api *DebugAPI) GetAccessibleState(from, to rpc.BlockNumber) (uint64, error
} }
return 0, errors.New("no state found") return 0, errors.New("no state found")
} }
// SetTrieFlushInterval configures how often in-memory tries are persisted
// to disk. The value is in terms of block processing time, not wall clock.
func (api *DebugAPI) SetTrieFlushInterval(interval string) error {
t, err := time.ParseDuration(interval)
if err != nil {
return err
}
api.eth.blockchain.SetTrieFlushInterval(t)
return nil
}

View File

@ -490,6 +490,11 @@ web3._extend({
call: 'debug_dbAncients', call: 'debug_dbAncients',
params: 0 params: 0
}), }),
new web3._extend.Method({
name: 'setTrieFlushInterval',
call: 'debug_setTrieFlushInterval',
params: 1
}),
], ],
properties: [] properties: []
}); });