diff --git a/core/blockchain.go b/core/blockchain.go index f9b92192c..e5fd58610 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -169,12 +169,14 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping - triedb *trie.Database // The database handler for maintaining trie nodes. - stateCache state.Database // State database to reuse between imports (contains state cache) + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + lastWrite uint64 // Last block when the state was flushed + flushInterval int64 // Time interval (processing time) after which to flush a state + triedb *trie.Database // The database handler for maintaining trie nodes. + stateCache state.Database // State database to reuse between imports (contains state cache) // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -258,6 +260,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis cacheConfig: cacheConfig, db: db, triedb: triedb, + flushInterval: int64(cacheConfig.TrieTimeLimit), triegc: prque.New(nil), quit: make(chan struct{}), chainmu: syncx.NewClosableMutex(), @@ -1248,8 +1251,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, nil } -var lastWrite uint64 - // writeBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. @@ -1311,53 +1312,55 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // If we're running an archive node, always flush if bc.cacheConfig.TrieDirtyDisabled { return bc.triedb.Commit(root, false, nil) - } else { - // Full but not archive node, do proper garbage collection - bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(root, -int64(block.NumberU64())) + } + // Full but not archive node, do proper garbage collection + bc.triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(root, -int64(block.NumberU64())) - if current := block.NumberU64(); current > TriesInMemory { - // If we exceeded our memory allowance, flush matured singleton nodes to disk - var ( - nodes, imgs = bc.triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 - ) - if nodes > limit || imgs > 4*1024*1024 { - bc.triedb.Cap(limit - ethdb.IdealBatchSize) - } - // Find the next state trie we need to commit - chosen := current - TriesInMemory - - // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > bc.cacheConfig.TrieTimeLimit { - // If the header is missing (canonical chain behind), we're reorging a low - // diff sidechain. Suspend committing until this operation is completed. - header := bc.GetHeaderByNumber(chosen) - if header == nil { - log.Warn("Reorg in progress, trie commit postponed", "number", chosen) - } else { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory) - } - // Flush an entire trie and restart the counters - bc.triedb.Commit(header.Root, true, nil) - lastWrite = chosen - bc.gcproc = 0 - } - } - // Garbage collect anything below our required write retention - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break - } - bc.triedb.Dereference(root.(common.Hash)) + current := block.NumberU64() + // Flush limits are not considered for the first TriesInMemory blocks. + if current <= TriesInMemory { + return nil + } + // If we exceeded our memory allowance, flush matured singleton nodes to disk + var ( + nodes, imgs = bc.triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + ) + if nodes > limit || imgs > 4*1024*1024 { + bc.triedb.Cap(limit - ethdb.IdealBatchSize) + } + // Find the next state trie we need to commit + chosen := current - TriesInMemory + flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) + // If we exceeded time allowance, flush an entire trie to disk + if bc.gcproc > flushInterval { + // If the header is missing (canonical chain behind), we're reorging a low + // diff sidechain. Suspend committing until this operation is completed. + header := bc.GetHeaderByNumber(chosen) + if header == nil { + log.Warn("Reorg in progress, trie commit postponed", "number", chosen) + } else { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < bc.lastWrite+TriesInMemory && bc.gcproc >= 2*flushInterval { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory) } + // Flush an entire trie and restart the counters + bc.triedb.Commit(header.Root, true, nil) + bc.lastWrite = chosen + bc.gcproc = 0 } } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + bc.triedb.Dereference(root.(common.Hash)) + } return nil } @@ -2436,3 +2439,10 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro bc.validator = v bc.processor = p } + +// SetTrieFlushInterval configures how often in-memory tries are persisted to disk. +// The interval is in terms of block processing time, not wall clock. +// It is thread-safe and can be called repeatedly without side effects. +func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { + atomic.StoreInt64(&bc.flushInterval, int64(interval)) +} diff --git a/eth/api.go b/eth/api.go index e480dde8f..2bfa60a8e 100644 --- a/eth/api.go +++ b/eth/api.go @@ -590,3 +590,14 @@ func (api *DebugAPI) GetAccessibleState(from, to rpc.BlockNumber) (uint64, error } return 0, errors.New("no state found") } + +// SetTrieFlushInterval configures how often in-memory tries are persisted +// to disk. The value is in terms of block processing time, not wall clock. +func (api *DebugAPI) SetTrieFlushInterval(interval string) error { + t, err := time.ParseDuration(interval) + if err != nil { + return err + } + api.eth.blockchain.SetTrieFlushInterval(t) + return nil +} diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 801afedaa..388eed8a2 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -490,6 +490,11 @@ web3._extend({ call: 'debug_dbAncients', params: 0 }), + new web3._extend.Method({ + name: 'setTrieFlushInterval', + call: 'debug_setTrieFlushInterval', + params: 1 + }), ], properties: [] });