les: use atomic type (#27168)

This commit is contained in:
Stephen Guo 2023-04-26 16:19:56 +08:00 committed by GitHub
parent f8aa623536
commit 25f9977f2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 25 deletions

View File

@ -21,7 +21,6 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -144,8 +143,8 @@ func (h *clientHandler) handle(p *serverPeer, noInitAnnounce bool) error {
} }
// Mark the peer starts to be served. // Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1) p.serving.Store(true)
defer atomic.StoreUint32(&p.serving, 0) defer p.serving.Store(false)
// Spawn a main loop to handle all incoming messages. // Spawn a main loop to handle all incoming messages.
for { for {

View File

@ -121,13 +121,13 @@ type peerCommons struct {
*p2p.Peer *p2p.Peer
rw p2p.MsgReadWriter rw p2p.MsgReadWriter
id string // Peer identity. id string // Peer identity.
version int // Protocol version negotiated. version int // Protocol version negotiated.
network uint64 // Network ID being on. network uint64 // Network ID being on.
frozen uint32 // Flag whether the peer is frozen. frozen atomic.Bool // Flag whether the peer is frozen.
announceType uint64 // New block announcement type. announceType uint64 // New block announcement type.
serving uint32 // The status indicates the peer is served. serving atomic.Bool // The status indicates the peer is served.
headInfo blockInfo // Last announced block information. headInfo blockInfo // Last announced block information.
// Background task queue for caching peer tasks and executing in order. // Background task queue for caching peer tasks and executing in order.
sendQueue *utils.ExecQueue sendQueue *utils.ExecQueue
@ -143,7 +143,7 @@ type peerCommons struct {
// isFrozen returns true if the client is frozen or the server has put our // isFrozen returns true if the client is frozen or the server has put our
// client in frozen state // client in frozen state
func (p *peerCommons) isFrozen() bool { func (p *peerCommons) isFrozen() bool {
return atomic.LoadUint32(&p.frozen) != 0 return p.frozen.Load()
} }
// canQueue returns an indicator whether the peer can queue an operation. // canQueue returns an indicator whether the peer can queue an operation.
@ -398,7 +398,7 @@ func (p *serverPeer) rejectUpdate(size uint64) bool {
// freeze processes Stop messages from the given server and set the status as // freeze processes Stop messages from the given server and set the status as
// frozen. // frozen.
func (p *serverPeer) freeze() { func (p *serverPeer) freeze() {
if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) { if p.frozen.CompareAndSwap(false, true) {
p.sendQueue.Clear() p.sendQueue.Clear()
} }
} }
@ -406,7 +406,7 @@ func (p *serverPeer) freeze() {
// unfreeze processes Resume messages from the given server and set the status // unfreeze processes Resume messages from the given server and set the status
// as unfrozen. // as unfrozen.
func (p *serverPeer) unfreeze() { func (p *serverPeer) unfreeze() {
atomic.StoreUint32(&p.frozen, 0) p.frozen.Store(false)
} }
// sendRequest send a request to the server based on the given message type // sendRequest send a request to the server based on the given message type
@ -823,11 +823,11 @@ func (p *clientPeer) freeze() {
if p.version < lpv3 { if p.version < lpv3 {
// if Stop/Resume is not supported then just drop the peer after setting // if Stop/Resume is not supported then just drop the peer after setting
// its frozen status permanently // its frozen status permanently
atomic.StoreUint32(&p.frozen, 1) p.frozen.Store(true)
p.Peer.Disconnect(p2p.DiscUselessPeer) p.Peer.Disconnect(p2p.DiscUselessPeer)
return return
} }
if atomic.SwapUint32(&p.frozen, 1) == 0 { if !p.frozen.Swap(true) {
go func() { go func() {
p.sendStop() p.sendStop()
time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom)))) time.Sleep(freezeTimeBase + time.Duration(rand.Int63n(int64(freezeTimeRandom))))
@ -840,7 +840,7 @@ func (p *clientPeer) freeze() {
time.Sleep(freezeCheckPeriod) time.Sleep(freezeCheckPeriod)
continue continue
} }
atomic.StoreUint32(&p.frozen, 0) p.frozen.Store(false)
p.sendResume(bufValue) p.sendResume(bufValue)
return return
} }

View File

@ -19,7 +19,6 @@ package les
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -164,8 +163,8 @@ func (h *serverHandler) handle(p *clientPeer) error {
}() }()
// Mark the peer as being served. // Mark the peer as being served.
atomic.StoreUint32(&p.serving, 1) p.serving.Store(true)
defer atomic.StoreUint32(&p.serving, 0) defer p.serving.Store(false)
// Spawn a main loop to handle all incoming messages. // Spawn a main loop to handle all incoming messages.
for { for {

View File

@ -25,7 +25,6 @@ import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"math/big" "math/big"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -380,7 +379,7 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
default: default:
} }
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { if peer1.serving.Load() && peer2.serving.Load() {
break break
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -441,7 +440,7 @@ func (client *testClient) newRawPeer(t *testing.T, name string, version int, rec
return nil, nil, nil return nil, nil, nil
default: default:
} }
if atomic.LoadUint32(&peer.serving) == 1 { if peer.serving.Load() {
break break
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -505,7 +504,7 @@ func (server *testServer) newRawPeer(t *testing.T, name string, version int) (*t
return nil, nil, nil return nil, nil, nil
default: default:
} }
if atomic.LoadUint32(&peer.serving) == 1 { if peer.serving.Load() {
break break
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)

View File

@ -20,7 +20,6 @@ import (
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"net" "net"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -136,7 +135,7 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr
return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err) return nil, nil, fmt.Errorf("failed to establish protocol connection %v", err)
default: default:
} }
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { if peer1.serving.Load() && peer2.serving.Load() {
break break
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)