rpc: use atomic types (#27214)

rpc: use atomic type
This commit is contained in:
s7v7nislands 2023-05-04 16:54:45 +08:00 committed by GitHub
parent dde2da0efb
commit ffda2c64c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 9 deletions

View File

@ -79,7 +79,7 @@ type Client struct {
isHTTP bool // connection type: http, ws or ipc isHTTP bool // connection type: http, ws or ipc
services *serviceRegistry services *serviceRegistry
idCounter uint32 idCounter atomic.Uint32
// This function, if non-nil, is called when the connection is lost. // This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc reconnectFunc reconnectFunc
@ -263,7 +263,7 @@ func (c *Client) RegisterName(name string, receiver interface{}) error {
} }
func (c *Client) nextID() json.RawMessage { func (c *Client) nextID() json.RawMessage {
id := atomic.AddUint32(&c.idCounter, 1) id := c.idCounter.Add(1)
return strconv.AppendUint(nil, uint64(id), 10) return strconv.AppendUint(nil, uint64(id), 10)
} }

View File

@ -48,7 +48,7 @@ type Server struct {
mutex sync.Mutex mutex sync.Mutex
codecs map[ServerCodec]struct{} codecs map[ServerCodec]struct{}
run int32 run atomic.Bool
} }
// NewServer creates a new server instance with no registered handlers. // NewServer creates a new server instance with no registered handlers.
@ -56,8 +56,8 @@ func NewServer() *Server {
server := &Server{ server := &Server{
idgen: randomIDGenerator(), idgen: randomIDGenerator(),
codecs: make(map[ServerCodec]struct{}), codecs: make(map[ServerCodec]struct{}),
run: 1,
} }
server.run.Store(true)
// Register the default service providing meta information about the RPC service such // Register the default service providing meta information about the RPC service such
// as the services and methods it offers. // as the services and methods it offers.
rpcService := &RPCService{server} rpcService := &RPCService{server}
@ -95,7 +95,7 @@ func (s *Server) trackCodec(codec ServerCodec) bool {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if atomic.LoadInt32(&s.run) == 0 { if !s.run.Load() {
return false // Don't serve if server is stopped. return false // Don't serve if server is stopped.
} }
s.codecs[codec] = struct{}{} s.codecs[codec] = struct{}{}
@ -114,7 +114,7 @@ func (s *Server) untrackCodec(codec ServerCodec) {
// this mode. // this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
// Don't serve if server is stopped. // Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 { if !s.run.Load() {
return return
} }
@ -144,7 +144,7 @@ func (s *Server) Stop() {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if atomic.CompareAndSwapInt32(&s.run, 1, 0) { if s.run.CompareAndSwap(true, false) {
log.Debug("RPC server shutting down") log.Debug("RPC server shutting down")
for codec := range s.codecs { for codec := range s.codecs {
codec.close() codec.close()

View File

@ -25,7 +25,7 @@ import (
) )
type StdIOUI struct { type StdIOUI struct {
client rpc.Client client *rpc.Client
} }
func NewStdIOUI() *StdIOUI { func NewStdIOUI() *StdIOUI {
@ -33,7 +33,7 @@ func NewStdIOUI() *StdIOUI {
if err != nil { if err != nil {
log.Crit("Could not create stdio client", "err", err) log.Crit("Could not create stdio client", "err", err)
} }
ui := &StdIOUI{client: *client} ui := &StdIOUI{client: client}
return ui return ui
} }