From 25f9977f2d29e2db93ab4799e94e0f26482380d4 Mon Sep 17 00:00:00 2001 From: Stephen Guo Date: Wed, 26 Apr 2023 16:19:56 +0800 Subject: [PATCH] les: use atomic type (#27168) --- les/client_handler.go | 5 ++--- les/peer.go | 26 +++++++++++++------------- les/server_handler.go | 5 ++--- les/test_helper.go | 7 +++---- les/ulc_test.go | 3 +-- 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/les/client_handler.go b/les/client_handler.go index 25a508d42..6e7d88fc5 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -21,7 +21,6 @@ import ( "math/big" "math/rand" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -144,8 +143,8 @@ func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error { } // Mark the peer starts to be served. - atomic.StoreUint32(&p.serving, 1) - defer atomic.StoreUint32(&p.serving, 0) + p.serving.Store(true) + defer p.serving.Store(false) // Spawn a main loop to handle all incoming messages. for { diff --git a/les/peer.go b/les/peer.go index 100cc7e4f..909d912e9 100644 --- a/les/peer.go +++ b/les/peer.go @@ -121,13 +121,13 @@ type peerCommons struct { *p2p.Peer rw p2p.MsgReadWriter - id string // Peer identity. - version int // Protocol version negotiated. - network uint64 // Network ID being on. - frozen uint32 // Flag whether the peer is frozen. - announceType uint64 // New block announcement type. - serving uint32 // The status indicates the peer is served. - headInfo blockInfo // Last announced block information. + id string // Peer identity. + version int // Protocol version negotiated. + network uint64 // Network ID being on. + frozen atomic.Bool // Flag whether the peer is frozen. + announceType uint64 // New block announcement type. + serving atomic.Bool // The status indicates the peer is served. + headInfo blockInfo // Last announced block information. // Background task queue for caching peer tasks and executing in order. sendQueue *utils.ExecQueue @@ -143,7 +143,7 @@ type peerCommons struct { // isFrozen returns true if the client is frozen or the server has put our // client in frozen state func (p *peerCommons) isFrozen() bool { - return atomic.LoadUint32(&p.frozen) != 0 + return p.frozen.Load() } // canQueue returns an indicator whether the peer can queue an operation. @@ -398,7 +398,7 @@ func (p *serverPeer) rejectUpdate(size uint64) bool { // freeze processes Stop messages from the given server and set the status as // frozen. func (p *serverPeer) freeze() { - if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) { + if p.frozen.CompareAndSwap(false, true) { p.sendQueue.Clear() } } @@ -406,7 +406,7 @@ func (p *serverPeer) freeze() { // unfreeze processes Resume messages from the given server and set the status // as unfrozen. func (p *serverPeer) unfreeze() { - atomic.StoreUint32(&p.frozen, 0) + p.frozen.Store(false) } // sendRequest send a request to the server based on the given message type @@ -823,11 +823,11 @@ func (p *clientPeer) freeze() { if p.version < lpv3 { // if Stop/Resume is not supported then just drop the peer after setting // its frozen status permanently - atomic.StoreUint32(&p.frozen, 1) + p.frozen.Store(true) p.Peer.Disconnect(p2p.DiscUselessPeer) return } - if atomic.SwapUint32(&p.frozen, 1) == 0 { + if !p.frozen.Swap(true) { go func() { p.sendStop() time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) @@ -840,7 +840,7 @@ func (p *clientPeer) freeze() { time.Sleep(freezeCheckPeriod) continue } - atomic.StoreUint32(&p.frozen, 0) + p.frozen.Store(false) p.sendResume(bufValue) return } diff --git a/les/server_handler.go b/les/server_handler.go index 39c7ace1c..418974c37 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -19,7 +19,6 @@ package les import ( "errors" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -164,8 +163,8 @@ func (h *serverHandler) handle(p *clientPeer) error { }() // Mark the peer as being served. - atomic.StoreUint32(&p.serving, 1) - defer atomic.StoreUint32(&p.serving, 0) + p.serving.Store(true) + defer p.serving.Store(false) // Spawn a main loop to handle all incoming messages. for { diff --git a/les/test_helper.go b/les/test_helper.go index 70f78498d..ead97ddd1 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -25,7 +25,6 @@ import ( "crypto/rand" "fmt" "math/big" - "sync/atomic" "testing" "time" @@ -380,7 +379,7 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) default: } - if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { + if peer1.serving.Load() && peer2.serving.Load() { break } time.Sleep(50 * time.Millisecond) @@ -441,7 +440,7 @@ func (client *testClient) newRawPeer(t *testing.T, name string, version int, rec return nil, nil, nil default: } - if atomic.LoadUint32(&peer.serving) == 1 { + if peer.serving.Load() { break } time.Sleep(50 * time.Millisecond) @@ -505,7 +504,7 @@ func (server *testServer) newRawPeer(t *testing.T, name string, version int) (*t return nil, nil, nil default: } - if atomic.LoadUint32(&peer.serving) == 1 { + if peer.serving.Load() { break } time.Sleep(50 * time.Millisecond) diff --git a/les/ulc_test.go b/les/ulc_test.go index 9a29a24ce..791bc2885 100644 --- a/les/ulc_test.go +++ b/les/ulc_test.go @@ -20,7 +20,6 @@ import ( "crypto/rand" "fmt" "net" - "sync/atomic" "testing" "time" @@ -136,7 +135,7 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) default: } - if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { + if peer1.serving.Load() && peer2.serving.Load() { break } time.Sleep(50 * time.Millisecond)