From 949cee2fe343532a2fb9cd4a5edebfedae86a371 Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Thu, 30 Mar 2023 18:53:32 +0800 Subject: [PATCH] core: use atomic type (#27011) --- core/blockchain.go | 28 ++++++++++++++-------------- core/chain_indexer.go | 6 +++--- core/state_prefetcher.go | 4 ++-- core/types.go | 4 +++- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8fc520e77..d7094c516 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -174,7 +174,7 @@ type BlockChain struct { triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping lastWrite uint64 // Last block when the state was flushed - flushInterval int64 // Time interval (processing time) after which to flush a state + flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state triedb *trie.Database // The database handler for maintaining trie nodes. stateCache state.Database // State database to reuse between imports (contains state cache) @@ -215,8 +215,8 @@ type BlockChain struct { wg sync.WaitGroup // quit chan struct{} // shutdown signal, closed in Stop. - running int32 // 0 if chain is running, 1 when stopped - procInterrupt int32 // interrupt signaler for block processing + stopping atomic.Bool // false if chain is running, true when stopped + procInterrupt atomic.Bool // interrupt signaler for block processing engine consensus.Engine validator Validator // Block and state validator interface @@ -260,7 +260,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis cacheConfig: cacheConfig, db: db, triedb: triedb, - flushInterval: int64(cacheConfig.TrieTimeLimit), triegc: prque.New[int64, common.Hash](nil), quit: make(chan struct{}), chainmu: syncx.NewClosableMutex(), @@ -273,6 +272,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis engine: engine, vmConfig: vmConfig, } + bc.flushInterval.Store(int64(cacheConfig.TrieTimeLimit)) bc.forker = NewForkChoice(bc, shouldPreserve) bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) bc.validator = NewBlockValidator(chainConfig, bc, engine) @@ -916,7 +916,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) { // This method has been exposed to allow tests to stop the blockchain while simulating // a crash. func (bc *BlockChain) stopWithoutSaving() { - if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { + if !bc.stopping.CompareAndSwap(false, true) { return } @@ -998,12 +998,12 @@ func (bc *BlockChain) Stop() { // errInsertionInterrupted as soon as possible. Insertion is permanently disabled after // calling this method. func (bc *BlockChain) StopInsert() { - atomic.StoreInt32(&bc.procInterrupt, 1) + bc.procInterrupt.Store(true) } // insertStopped returns true after StopInsert has been called. func (bc *BlockChain) insertStopped() bool { - return atomic.LoadInt32(&bc.procInterrupt) == 1 + return bc.procInterrupt.Load() } func (bc *BlockChain) procFutureBlocks() { @@ -1382,7 +1382,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } // Find the next state trie we need to commit chosen := current - TriesInMemory - flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) + flushInterval := time.Duration(bc.flushInterval.Load()) // If we exceeded time allowance, flush an entire trie to disk if bc.gcproc > flushInterval { // If the header is missing (canonical chain behind), we're reorging a low @@ -1735,7 +1735,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) // If we have a followup block, run that against the current state to pre-cache // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt uint32 + var followupInterrupt atomic.Bool if !bc.cacheConfig.TrieCleanNoPrefetch { if followup, err := it.peek(); followup != nil && err == nil { throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) @@ -1744,7 +1744,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) blockPrefetchExecuteTimer.Update(time.Since(start)) - if atomic.LoadUint32(&followupInterrupt) == 1 { + if followupInterrupt.Load() { blockPrefetchInterruptMeter.Mark(1) } }(time.Now(), followup, throwaway) @@ -1756,7 +1756,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) return it.index, err } ptime := time.Since(pstart) @@ -1764,7 +1764,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) vstart := time.Now() if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) return it.index, err } vtime := time.Since(vstart) @@ -1797,7 +1797,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) } else { status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false) } - atomic.StoreUint32(&followupInterrupt, 1) + followupInterrupt.Store(true) if err != nil { return it.index, err } @@ -2497,5 +2497,5 @@ func (bc *BlockChain) SetBlockValidatorAndProcessorForTesting(v Validator, p Pro // The interval is in terms of block processing time, not wall clock. // It is thread-safe and can be called repeatedly without side effects. func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) { - atomic.StoreInt64(&bc.flushInterval, int64(interval)) + bc.flushInterval.Store(int64(interval)) } diff --git a/core/chain_indexer.go b/core/chain_indexer.go index 95901a0ea..23ab23ef0 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -75,7 +75,7 @@ type ChainIndexer struct { backend ChainIndexerBackend // Background processor generating the index data content children []*ChainIndexer // Child indexers to cascade chain updates to - active uint32 // Flag whether the event loop was started + active atomic.Bool // Flag whether the event loop was started update chan struct{} // Notification channel that headers should be processed quit chan chan error // Quit channel to tear down running goroutines ctx context.Context @@ -166,7 +166,7 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } // If needed, tear down the secondary event loop - if atomic.LoadUint32(&c.active) != 0 { + if c.active.Load() { c.quit <- errc if err := <-errc; err != nil { errs = append(errs, err) @@ -196,7 +196,7 @@ func (c *ChainIndexer) Close() error { // queue. func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown - atomic.StoreUint32(&c.active, 1) + c.active.Store(true) defer sub.Unsubscribe() diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index c258eee4f..721f4056b 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -47,7 +47,7 @@ func newStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine conse // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. -func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { +func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) { var ( header = block.Header() gaspool = new(GasPool).AddGas(block.GasLimit()) @@ -59,7 +59,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c byzantium := p.config.IsByzantium(block.Number()) for i, tx := range block.Transactions() { // If block precaching was interrupted, abort - if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + if interrupt != nil && interrupt.Load() { return } // Convert the transaction into an executable message and pre-cache its sender diff --git a/core/types.go b/core/types.go index 4c5b74a49..36eb0d1de 100644 --- a/core/types.go +++ b/core/types.go @@ -17,6 +17,8 @@ package core import ( + "sync/atomic" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -39,7 +41,7 @@ type Prefetcher interface { // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to pre-cache transaction signatures and state trie nodes. - Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) + Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) } // Processor is an interface for processing blocks using a given initial state.