eth/downloader: use atomic types (#27030)

* eth/downloader: use atomic type

* Update eth/downloader/downloader_test.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* Update eth/downloader/downloader_test.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
This commit is contained in:
s7v7nislands 2023-04-04 03:48:10 +08:00 committed by GitHub
parent beda6c41ad
commit db18293c32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 54 deletions

View File

@ -19,7 +19,6 @@ package downloader
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -371,7 +370,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
continue continue
} }
// If the pivot block is committed, signal header sync termination // If the pivot block is committed, signal header sync termination
if atomic.LoadInt32(&d.committed) == 1 { if d.committed.Load() {
select { select {
case d.headerProcCh <- nil: case d.headerProcCh <- nil:
return nil return nil

View File

@ -98,7 +98,7 @@ type headerTask struct {
} }
type Downloader struct { type Downloader struct {
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events mux *event.TypeMux // Event multiplexer to announce sync operation events
checkpoint uint64 // Checkpoint block number to enforce head against (e.g. snap sync) checkpoint uint64 // Checkpoint block number to enforce head against (e.g. snap sync)
@ -122,9 +122,9 @@ type Downloader struct {
// Status // Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32 synchronising atomic.Bool
notified int32 notified atomic.Bool
committed int32 committed atomic.Bool
ancientLimit uint64 // The maximum block number which can be regarded as ancient data. ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// Channels // Channels
@ -292,7 +292,7 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
// Synchronising returns whether the downloader is currently retrieving blocks. // Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool { func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0 return d.synchronising.Load()
} }
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
@ -392,13 +392,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
return d.synchroniseMock(id, hash) return d.synchroniseMock(id, hash)
} }
// Make sure only one goroutine is ever allowed past this point at once // Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { if !d.synchronising.CompareAndSwap(false, true) {
return errBusy return errBusy
} }
defer atomic.StoreInt32(&d.synchronising, 0) defer d.synchronising.Store(false)
// Post a user notification of the sync (only once per session) // Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started") log.Info("Block synchronisation started")
} }
if mode == SnapSync { if mode == SnapSync {
@ -435,7 +435,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
defer d.Cancel() // No matter what, we can't leave the cancel channel open defer d.Cancel() // No matter what, we can't leave the cancel channel open
// Atomically set the requested sync mode // Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode)) d.mode.Store(uint32(mode))
// Retrieve the origin peer and initiate the downloading process // Retrieve the origin peer and initiate the downloading process
var p *peerConnection var p *peerConnection
@ -452,7 +452,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
} }
func (d *Downloader) getMode() SyncMode { func (d *Downloader) getMode() SyncMode {
return SyncMode(atomic.LoadUint32(&d.mode)) return SyncMode(d.mode.Load())
} }
// syncWithPeer starts a block synchronization based on the hash chain from the // syncWithPeer starts a block synchronization based on the hash chain from the
@ -562,9 +562,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber) rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
} }
} }
d.committed = 1 d.committed.Store(true)
if mode == SnapSync && pivot.Number.Uint64() != 0 { if mode == SnapSync && pivot.Number.Uint64() != 0 {
d.committed = 0 d.committed.Store(false)
} }
if mode == SnapSync { if mode == SnapSync {
// Set the ancient data limitation. If we are running snap sync, all block // Set the ancient data limitation. If we are running snap sync, all block
@ -1128,7 +1128,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// If no more headers are inbound, notify the content fetchers and return // If no more headers are inbound, notify the content fetchers and return
if len(headers) == 0 { if len(headers) == 0 {
// Don't abort header fetches while the pivot is downloading // Don't abort header fetches while the pivot is downloading
if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { if !d.committed.Load() && pivot <= from {
p.log.Debug("No headers, waiting for pivot commit") p.log.Debug("No headers, waiting for pivot commit")
select { select {
case <-time.After(fsHeaderContCheck): case <-time.After(fsHeaderContCheck):
@ -1669,7 +1669,7 @@ func (d *Downloader) processSnapSyncContent() error {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
} }
// Split around the pivot block and process the two sides via snap/full sync // Split around the pivot block and process the two sides via snap/full sync
if atomic.LoadInt32(&d.committed) == 0 { if !d.committed.Load() {
latest := results[len(results)-1].Header latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot // If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a // become stale in the network and it was garbage collected, move to a
@ -1794,7 +1794,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil { if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
return err return err
} }
atomic.StoreInt32(&d.committed, 1) d.committed.Store(true)
return nil return nil
} }

View File

@ -476,9 +476,10 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, testChainBase.blocks[1:]) tester.newPeer("peer", protocol, testChainBase.blocks[1:])
// Wrap the importer to allow stepping // Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{}) var blocked atomic.Uint32
proceed := make(chan struct{})
tester.downloader.chainInsertHook = func(results []*fetchResult) { tester.downloader.chainInsertHook = func(results []*fetchResult) {
atomic.StoreUint32(&blocked, uint32(len(results))) blocked.Store(uint32(len(results)))
<-proceed <-proceed
} }
// Start a synchronisation concurrently // Start a synchronisation concurrently
@ -505,7 +506,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.downloader.queue.resultCache.lock.Lock() tester.downloader.queue.resultCache.lock.Lock()
{ {
cached = tester.downloader.queue.resultCache.countCompleted() cached = tester.downloader.queue.resultCache.countCompleted()
frozen = int(atomic.LoadUint32(&blocked)) frozen = int(blocked.Load())
retrieved = int(tester.chain.CurrentSnapBlock().Number.Uint64()) + 1 retrieved = int(tester.chain.CurrentSnapBlock().Number.Uint64()) + 1
} }
tester.downloader.queue.resultCache.lock.Unlock() tester.downloader.queue.resultCache.lock.Unlock()
@ -528,8 +529,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1) t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
} }
// Permit the blocked blocks to import // Permit the blocked blocks to import
if atomic.LoadUint32(&blocked) > 0 { if blocked.Load() > 0 {
atomic.StoreUint32(&blocked, uint32(0)) blocked.Store(uint32(0))
proceed <- struct{}{} proceed <- struct{}{}
} }
} }
@ -786,12 +787,12 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, chain.blocks[1:]) tester.newPeer("peer", protocol, chain.blocks[1:])
// Instrument the downloader to signal body requests // Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0) var bodiesHave, receiptsHave atomic.Int32
tester.downloader.bodyFetchHook = func(headers []*types.Header) { tester.downloader.bodyFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&bodiesHave, int32(len(headers))) bodiesHave.Add(int32(len(headers)))
} }
tester.downloader.receiptFetchHook = func(headers []*types.Header) { tester.downloader.receiptFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&receiptsHave, int32(len(headers))) receiptsHave.Add(int32(len(headers)))
} }
// Synchronise with the peer and make sure all blocks were retrieved // Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer", nil, mode); err != nil { if err := tester.sync("peer", nil, mode); err != nil {
@ -811,11 +812,11 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
receiptsNeeded++ receiptsNeeded++
} }
} }
if int(bodiesHave) != bodiesNeeded { if int(bodiesHave.Load()) != bodiesNeeded {
t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded) t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave.Load(), bodiesNeeded)
} }
if int(receiptsHave) != receiptsNeeded { if int(receiptsHave.Load()) != receiptsNeeded {
t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded) t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave.Load(), receiptsNeeded)
} }
} }

View File

@ -61,7 +61,7 @@ type fetchRequest struct {
// fetchResult is a struct collecting partial results from data fetchers until // fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed. // all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct { type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding pending atomic.Int32 // Flag telling what deliveries are outstanding
Header *types.Header Header *types.Header
Uncles []*types.Header Uncles []*types.Header
@ -75,38 +75,38 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
Header: header, Header: header,
} }
if !header.EmptyBody() { if !header.EmptyBody() {
item.pending |= (1 << bodyType) item.pending.Store(item.pending.Load() | (1 << bodyType))
} else if header.WithdrawalsHash != nil { } else if header.WithdrawalsHash != nil {
item.Withdrawals = make(types.Withdrawals, 0) item.Withdrawals = make(types.Withdrawals, 0)
} }
if fastSync && !header.EmptyReceipts() { if fastSync && !header.EmptyReceipts() {
item.pending |= (1 << receiptType) item.pending.Store(item.pending.Load() | (1 << receiptType))
} }
return item return item
} }
// SetBodyDone flags the body as finished. // SetBodyDone flags the body as finished.
func (f *fetchResult) SetBodyDone() { func (f *fetchResult) SetBodyDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 { if v := f.pending.Load(); (v & (1 << bodyType)) != 0 {
atomic.AddInt32(&f.pending, -1) f.pending.Add(-1)
} }
} }
// AllDone checks if item is done. // AllDone checks if item is done.
func (f *fetchResult) AllDone() bool { func (f *fetchResult) AllDone() bool {
return atomic.LoadInt32(&f.pending) == 0 return f.pending.Load() == 0
} }
// SetReceiptsDone flags the receipts as finished. // SetReceiptsDone flags the receipts as finished.
func (f *fetchResult) SetReceiptsDone() { func (f *fetchResult) SetReceiptsDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 { if v := f.pending.Load(); (v & (1 << receiptType)) != 0 {
atomic.AddInt32(&f.pending, -2) f.pending.Add(-2)
} }
} }
// Done checks if the given type is done already // Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool { func (f *fetchResult) Done(kind uint) bool {
v := atomic.LoadInt32(&f.pending) v := f.pending.Load()
return v&(1<<kind) == 0 return v&(1<<kind) == 0
} }

View File

@ -33,7 +33,7 @@ type resultStore struct {
// Internal index of first non-completed entry, updated atomically when needed. // Internal index of first non-completed entry, updated atomically when needed.
// If all items are complete, this will equal length(items), so // If all items are complete, this will equal length(items), so
// *important* : is not safe to use for indexing without checking against length // *important* : is not safe to use for indexing without checking against length
indexIncomplete int32 // atomic access indexIncomplete atomic.Int32
// throttleThreshold is the limit up to which we _want_ to fill the // throttleThreshold is the limit up to which we _want_ to fill the
// results. If blocks are large, we want to limit the results to less // results. If blocks are large, we want to limit the results to less
@ -146,7 +146,7 @@ func (r *resultStore) HasCompletedItems() bool {
func (r *resultStore) countCompleted() int { func (r *resultStore) countCompleted() int {
// We iterate from the already known complete point, and see // We iterate from the already known complete point, and see
// if any more has completed since last count // if any more has completed since last count
index := atomic.LoadInt32(&r.indexIncomplete) index := r.indexIncomplete.Load()
for ; ; index++ { for ; ; index++ {
if index >= int32(len(r.items)) { if index >= int32(len(r.items)) {
break break
@ -156,7 +156,7 @@ func (r *resultStore) countCompleted() int {
break break
} }
} }
atomic.StoreInt32(&r.indexIncomplete, index) r.indexIncomplete.Store(index)
return int(index) return int(index)
} }
@ -179,7 +179,7 @@ func (r *resultStore) GetCompleted(limit int) []*fetchResult {
} }
// Advance the expected block number of the first cache entry // Advance the expected block number of the first cache entry
r.resultOffset += uint64(limit) r.resultOffset += uint64(limit)
atomic.AddInt32(&r.indexIncomplete, int32(-limit)) r.indexIncomplete.Add(int32(-limit))
return results return results
} }

View File

@ -82,8 +82,8 @@ type skeletonTestPeer struct {
serve func(origin uint64) []*types.Header // Hook to allow custom responses serve func(origin uint64) []*types.Header // Hook to allow custom responses
served uint64 // Number of headers served by this peer served atomic.Uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding) dropped atomic.Uint64 // Flag whether the peer was dropped (stop responding)
} }
// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with. // newSkeletonTestPeer creates a new mock peer to test the skeleton sync with.
@ -113,7 +113,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
// Since skeleton test peer are in-memory mocks, dropping the does not make // Since skeleton test peer are in-memory mocks, dropping the does not make
// them inaccessible. As such, check a local `dropped` field to see if the // them inaccessible. As such, check a local `dropped` field to see if the
// peer has been dropped and should not respond any more. // peer has been dropped and should not respond any more.
if atomic.LoadUint64(&p.dropped) != 0 { if p.dropped.Load() != 0 {
return nil, errors.New("peer already dropped") return nil, errors.New("peer already dropped")
} }
// Skeleton sync retrieves batches of headers going backward without gaps. // Skeleton sync retrieves batches of headers going backward without gaps.
@ -161,7 +161,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
} }
} }
} }
atomic.AddUint64(&p.served, uint64(len(headers))) p.served.Add(uint64(len(headers)))
hashes := make([]common.Hash, len(headers)) hashes := make([]common.Hash, len(headers))
for i, header := range headers { for i, header := range headers {
@ -182,7 +182,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
sink <- res sink <- res
if err := <-res.Done; err != nil { if err := <-res.Done; err != nil {
log.Warn("Skeleton test peer response rejected", "err", err) log.Warn("Skeleton test peer response rejected", "err", err)
atomic.AddUint64(&p.dropped, 1) p.dropped.Add(1)
} }
}() }()
return req, nil return req, nil
@ -817,7 +817,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
dropped := make(map[string]int) dropped := make(map[string]int)
drop := func(peer string) { drop := func(peer string) {
if p := peerset.Peer(peer); p != nil { if p := peerset.Peer(peer); p != nil {
atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1) p.peer.(*skeletonTestPeer).dropped.Add(1)
} }
peerset.Unregister(peer) peerset.Unregister(peer)
dropped[peer]++ dropped[peer]++
@ -895,14 +895,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable { if !tt.unpredictable {
var served uint64 var served uint64
for _, peer := range tt.peers { for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served) served += peer.served.Load()
} }
if served != tt.midserve { if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
} }
var drops uint64 var drops uint64
for _, peer := range tt.peers { for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped) drops += peer.dropped.Load()
} }
if drops != tt.middrop { if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
@ -950,20 +950,20 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable { if !tt.unpredictable {
served := uint64(0) served := uint64(0)
for _, peer := range tt.peers { for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served) served += peer.served.Load()
} }
if tt.newPeer != nil { if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served) served += tt.newPeer.served.Load()
} }
if served != tt.endserve { if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
} }
drops := uint64(0) drops := uint64(0)
for _, peer := range tt.peers { for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped) drops += peer.dropped.Load()
} }
if tt.newPeer != nil { if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped) drops += tt.newPeer.dropped.Load()
} }
if drops != tt.enddrop { if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)