From f541cad272087d6633122d9f955881c8597a510f Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Tue, 25 Apr 2023 18:06:50 +0800 Subject: [PATCH] eth: use new atomic types (#27137) --- eth/backend.go | 7 +++-- eth/fetcher/block_fetcher_test.go | 44 +++++++++++++++---------------- eth/gasprice/feehistory.go | 10 +++---- eth/handler.go | 20 +++++++------- eth/handler_eth.go | 3 +-- eth/handler_eth_test.go | 6 ++--- eth/protocols/snap/sync.go | 14 +++++----- eth/sync.go | 9 +++---- eth/sync_test.go | 7 +++-- 9 files changed, 58 insertions(+), 62 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 105a8e897..24ae08502 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -23,7 +23,6 @@ import ( "math/big" "runtime" "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -437,7 +436,7 @@ func (s *Ethereum) StartMining(threads int) error { } // If mining is started, we can disable the transaction rejection mechanism // introduced to speed sync times. - atomic.StoreUint32(&s.handler.acceptTxs, 1) + s.handler.acceptTxs.Store(true) go s.miner.Start() } @@ -469,8 +468,8 @@ func (s *Ethereum) Engine() consensus.Engine { return s.engine } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) IsListening() bool { return true } // Always listening func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader } -func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 } -func (s *Ethereum) SetSynced() { atomic.StoreUint32(&s.handler.acceptTxs, 1) } +func (s *Ethereum) Synced() bool { return s.handler.acceptTxs.Load() } +func (s *Ethereum) SetSynced() { s.handler.acceptTxs.Store(true) } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } func (s *Ethereum) Merger() *consensus.Merger { return s.merger } diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index 0553add4f..520fc9c7b 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -413,13 +413,13 @@ func testConcurrentAnnouncements(t *testing.T, light bool) { secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack) secondBodyFetcher := tester.makeBodyFetcher("second", blocks, 0) - counter := uint32(0) + var counter atomic.Uint32 firstHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { - atomic.AddUint32(&counter, 1) + counter.Add(1) return firstHeaderFetcher(hash, sink) } secondHeaderWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { - atomic.AddUint32(&counter, 1) + counter.Add(1) return secondHeaderFetcher(hash, sink) } // Iteratively announce blocks until all are imported @@ -446,8 +446,8 @@ func testConcurrentAnnouncements(t *testing.T, light bool) { verifyImportDone(t, imported) // Make sure no blocks were retrieved twice - if int(counter) != targetBlocks { - t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks) + if c := int(counter.Load()); c != targetBlocks { + t.Fatalf("retrieval count mismatch: have %v, want %v", c, targetBlocks) } verifyChainHeight(t, tester, uint64(len(hashes)-1)) } @@ -513,9 +513,9 @@ func testPendingDeduplication(t *testing.T, light bool) { bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0) delay := 50 * time.Millisecond - counter := uint32(0) + var counter atomic.Uint32 headerWrapper := func(hash common.Hash, sink chan *eth.Response) (*eth.Request, error) { - atomic.AddUint32(&counter, 1) + counter.Add(1) // Simulate a long running fetch resink := make(chan *eth.Response) @@ -545,8 +545,8 @@ func testPendingDeduplication(t *testing.T, light bool) { time.Sleep(delay) // Check that all blocks were imported and none fetched twice - if int(counter) != 1 { - t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1) + if c := counter.Load(); c != 1 { + t.Fatalf("retrieval count mismatch: have %v, want %v", c, 1) } verifyChainHeight(t, tester, 1) } @@ -632,9 +632,9 @@ func TestImportDeduplication(t *testing.T) { headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) - counter := uint32(0) + var counter atomic.Uint32 tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) { - atomic.AddUint32(&counter, uint32(len(blocks))) + counter.Add(uint32(len(blocks))) return tester.insertChain(blocks) } // Instrument the fetching and imported events @@ -655,8 +655,8 @@ func TestImportDeduplication(t *testing.T) { tester.fetcher.Enqueue("valid", blocks[hashes[1]]) verifyImportCount(t, imported, 2) - if counter != 2 { - t.Fatalf("import invocation count mismatch: have %v, want %v", counter, 2) + if c := counter.Load(); c != 2 { + t.Fatalf("import invocation count mismatch: have %v, want %v", c, 2) } } @@ -853,13 +853,13 @@ func TestHashMemoryExhaustionAttack(t *testing.T) { // Create a tester with instrumented import hooks tester := newTester(false) - imported, announces := make(chan interface{}), int32(0) + imported, announces := make(chan interface{}), atomic.Int32{} tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) { if added { - atomic.AddInt32(&announces, 1) + announces.Add(1) } else { - atomic.AddInt32(&announces, -1) + announces.Add(-1) } } // Create a valid chain and an infinite junk chain @@ -879,7 +879,7 @@ func TestHashMemoryExhaustionAttack(t *testing.T) { } tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher) } - if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist { + if count := announces.Load(); count != hashLimit+maxQueueDist { t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist) } // Wait for fetches to complete @@ -900,13 +900,13 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { // Create a tester with instrumented import hooks tester := newTester(false) - imported, enqueued := make(chan interface{}), int32(0) + imported, enqueued := make(chan interface{}), atomic.Int32{} tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) { if added { - atomic.AddInt32(&enqueued, 1) + enqueued.Add(1) } else { - atomic.AddInt32(&enqueued, -1) + enqueued.Add(-1) } } // Create a valid chain and a batch of dangling (but in range) blocks @@ -924,7 +924,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester.fetcher.Enqueue("attacker", block) } time.Sleep(200 * time.Millisecond) - if queued := atomic.LoadInt32(&enqueued); queued != blockLimit { + if queued := enqueued.Load(); queued != blockLimit { t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit) } // Queue up a batch of valid blocks, and check that a new peer is allowed to do so @@ -932,7 +932,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester.fetcher.Enqueue("valid", blocks[hashes[len(hashes)-3-i]]) } time.Sleep(100 * time.Millisecond) - if queued := atomic.LoadInt32(&enqueued); queued != blockLimit+maxQueueDist-1 { + if queued := enqueued.Load(); queued != blockLimit+maxQueueDist-1 { t.Fatalf("queued block count mismatch: have %d, want %d", queued, blockLimit+maxQueueDist-1) } // Insert the missing piece (and sanity check the import) diff --git a/eth/gasprice/feehistory.go b/eth/gasprice/feehistory.go index 82edf9a7f..ee0b0ebcc 100644 --- a/eth/gasprice/feehistory.go +++ b/eth/gasprice/feehistory.go @@ -251,10 +251,10 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks uint64, unresolvedL } oldestBlock := lastBlock + 1 - blocks - var ( - next = oldestBlock - results = make(chan *blockFees, blocks) - ) + var next atomic.Uint64 + next.Store(oldestBlock) + results := make(chan *blockFees, blocks) + percentileKey := make([]byte, 8*len(rewardPercentiles)) for i, p := range rewardPercentiles { binary.LittleEndian.PutUint64(percentileKey[i*8:(i+1)*8], math.Float64bits(p)) @@ -263,7 +263,7 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks uint64, unresolvedL go func() { for { // Retrieve the next block number to fetch with this goroutine - blockNumber := atomic.AddUint64(&next, 1) - 1 + blockNumber := next.Add(1) - 1 if blockNumber > lastBlock { return } diff --git a/eth/handler.go b/eth/handler.go index 71ed6b227..d4c538e6f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -91,8 +91,8 @@ type handler struct { networkID uint64 forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node - snapSync uint32 // Flag whether snap sync is enabled (gets disabled if we already have blocks) - acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks) + acceptTxs atomic.Bool // Flag whether we're considered synchronised (enables transaction processing) database ethdb.Database txpool txPool @@ -149,7 +149,7 @@ func newHandler(config *handlerConfig) (*handler, error) { // In these cases however it's safe to reenable snap sync. fullBlock, snapBlock := h.chain.CurrentBlock(), h.chain.CurrentSnapBlock() if fullBlock.Number.Uint64() == 0 && snapBlock.Number.Uint64() > 0 { - h.snapSync = uint32(1) + h.snapSync.Store(true) log.Warn("Switch sync mode from full sync to snap sync") } } else { @@ -158,7 +158,7 @@ func newHandler(config *handlerConfig) (*handler, error) { log.Warn("Switch sync mode from snap sync to full sync") } else { // If snap sync was requested and our database is empty, grant it - h.snapSync = uint32(1) + h.snapSync.Store(true) } } // If sync succeeds, pass a callback to potentially disable snap sync mode @@ -166,13 +166,13 @@ func newHandler(config *handlerConfig) (*handler, error) { success := func() { // If we were running snap sync and it finished, disable doing another // round on next sync cycle - if atomic.LoadUint32(&h.snapSync) == 1 { + if h.snapSync.Load() { log.Info("Snap sync complete, auto disabling") - atomic.StoreUint32(&h.snapSync, 0) + h.snapSync.Store(false) } // If we've successfully finished a sync cycle, accept transactions from // the network - atomic.StoreUint32(&h.acceptTxs, 1) + h.acceptTxs.Store(true) } // Construct the downloader (long sync) h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, success) @@ -232,7 +232,7 @@ func newHandler(config *handlerConfig) (*handler, error) { // accept each others' blocks until a restart. Unfortunately we haven't figured // out a way yet where nodes can decide unilaterally whether the network is new // or not. This should be fixed if we figure out a solution. - if atomic.LoadUint32(&h.snapSync) == 1 { + if h.snapSync.Load() { log.Warn("Snap syncing, discarded propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } @@ -261,7 +261,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } n, err := h.chain.InsertChain(blocks) if err == nil { - atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import + h.acceptTxs.Store(true) // Mark initial sync done on any fetcher import } return n, err } @@ -310,7 +310,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } reject := false // reserved peer slots - if atomic.LoadUint32(&h.snapSync) == 1 { + if h.snapSync.Load() { if snap == nil { // If we are running snap-sync, we want to reserve roughly half the peer // slots for peers supporting the snap protocol. diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 4ed633576..00be022d9 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -19,7 +19,6 @@ package eth import ( "fmt" "math/big" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -52,7 +51,7 @@ func (h *ethHandler) PeerInfo(id enode.ID) interface{} { // AcceptTxs retrieves whether transaction processing is enabled on the node // or if inbound transactions should simply be dropped. func (h *ethHandler) AcceptTxs() bool { - return atomic.LoadUint32(&h.acceptTxs) == 1 + return h.acceptTxs.Load() } // Handle is invoked from a peer's message handler when it receives a new remote diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index d51d01de9..fd7df2340 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -248,7 +248,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { handler := newTestHandler() defer handler.close() - handler.handler.acceptTxs = 1 // mark synced to accept transactions + handler.handler.acceptTxs.Store(true) // mark synced to accept transactions txs := make(chan core.NewTxsEvent) sub := handler.txpool.SubscribeNewTxsEvent(txs) @@ -394,7 +394,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { // to receive them. We need multiple sinks since a one-to-one peering would // broadcast all transactions without announcement. source := newTestHandler() - source.handler.snapSync = 0 // Avoid requiring snap, otherwise some will be dropped below + source.handler.snapSync.Store(false) // Avoid requiring snap, otherwise some will be dropped below defer source.close() sinks := make([]*testHandler, 10) @@ -402,7 +402,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { sinks[i] = newTestHandler() defer sinks[i].close() - sinks[i].handler.acceptTxs = 1 // mark synced to accept transactions + sinks[i].handler.acceptTxs.Store(true) // mark synced to accept transactions } // Interconnect all the sink handlers with the source handler for i, sink := range sinks { diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 13279fd96..e99eb8843 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -449,10 +449,10 @@ type Syncer struct { trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running - trienodeHealRate float64 // Average heal rate for processing trie node data - trienodeHealPend uint64 // Number of trie nodes currently pending for processing - trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested - trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated + trienodeHealRate float64 // Average heal rate for processing trie node data + trienodeHealPend atomic.Uint64 // Number of trie nodes currently pending for processing + trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested + trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk @@ -2189,7 +2189,7 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { // HR(N) = (1-MI)^N*(OR-NR) + NR s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate - pending := atomic.LoadUint64(&s.trienodeHealPend) + pending := s.trienodeHealPend.Load() if time.Since(s.trienodeHealThrottled) > time.Second { // Periodically adjust the trie node throttler if float64(pending) > 2*s.trienodeHealRate { @@ -2776,9 +2776,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error return errors.New("unexpected healing trienode") } // Response validated, send it to the scheduler for filling - atomic.AddUint64(&s.trienodeHealPend, fills) + s.trienodeHealPend.Add(fills) defer func() { - atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1)) + s.trienodeHealPend.Add(^(fills - 1)) }() response := &trienodeHealResponse{ paths: req.paths, diff --git a/eth/sync.go b/eth/sync.go index e56d470cf..ace107114 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -19,7 +19,6 @@ package eth import ( "errors" "math/big" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -205,7 +204,7 @@ func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp { func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { // If we're in snap sync mode, return that directly - if atomic.LoadUint32(&cs.handler.snapSync) == 1 { + if cs.handler.snapSync.Load() { block := cs.handler.chain.CurrentSnapBlock() td := cs.handler.chain.GetTd(block.Hash(), block.Number.Uint64()) return downloader.SnapSync, td @@ -256,13 +255,13 @@ func (h *handler) doSync(op *chainSyncOp) error { if err != nil { return err } - if atomic.LoadUint32(&h.snapSync) == 1 { + if h.snapSync.Load() { log.Info("Snap sync complete, auto disabling") - atomic.StoreUint32(&h.snapSync, 0) + h.snapSync.Store(false) } // If we've successfully finished a sync cycle, enable accepting transactions // from the network. - atomic.StoreUint32(&h.acceptTxs, 1) + h.acceptTxs.Store(true) head := h.chain.CurrentBlock() if head.Number.Uint64() > 0 { diff --git a/eth/sync_test.go b/eth/sync_test.go index 0b9f9e1bb..b5e00298b 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -17,7 +17,6 @@ package eth import ( - "sync/atomic" "testing" "time" @@ -39,14 +38,14 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { // Create an empty handler and ensure it's in snap sync mode empty := newTestHandler() - if atomic.LoadUint32(&empty.handler.snapSync) == 0 { + if !empty.handler.snapSync.Load() { t.Fatalf("snap sync disabled on pristine blockchain") } defer empty.close() // Create a full handler and ensure snap sync ends up disabled full := newTestHandlerWithBlocks(1024) - if atomic.LoadUint32(&full.handler.snapSync) == 1 { + if full.handler.snapSync.Load() { t.Fatalf("snap sync not disabled on non-empty blockchain") } defer full.close() @@ -91,7 +90,7 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) { if err := empty.handler.doSync(op); err != nil { t.Fatal("sync failed:", err) } - if atomic.LoadUint32(&empty.handler.snapSync) == 1 { + if empty.handler.snapSync.Load() { t.Fatalf("snap sync not disabled after successful synchronisation") } }