p2p: Wrap conn.flags ops with atomic.Load/Store

This commit is contained in:
Andrey Petrov 2018-06-07 21:50:08 -04:00
parent 193a402cc0
commit 6209545083
3 changed files with 28 additions and 20 deletions

View File

@ -95,11 +95,10 @@ type PeerEvent struct {
// Peer represents a connected remote node. // Peer represents a connected remote node.
type Peer struct { type Peer struct {
rw *conn rw *conn
isInbound bool // Cached from rw.flags to avoid a race condition running map[string]*protoRW
running map[string]*protoRW log log.Logger
log log.Logger created mclock.AbsTime
created mclock.AbsTime
wg sync.WaitGroup wg sync.WaitGroup
protoErr chan error protoErr chan error
@ -161,20 +160,19 @@ func (p *Peer) String() string {
// Inbound returns true if the peer is an inbound connection // Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool { func (p *Peer) Inbound() bool {
return p.isInbound return p.rw.is(inboundConn)
} }
func newPeer(conn *conn, protocols []Protocol) *Peer { func newPeer(conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn) protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{ p := &Peer{
rw: conn, rw: conn,
isInbound: conn.is(inboundConn), running: protomap,
running: protomap, created: mclock.Now(),
created: mclock.Now(), disc: make(chan DiscReason),
disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}),
closed: make(chan struct{}), log: log.New("id", conn.id, "conn", conn.flags),
log: log.New("id", conn.id, "conn", conn.flags),
} }
return p return p
} }

View File

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -187,7 +188,7 @@ type peerDrop struct {
requested bool // true if signaled by the peer requested bool // true if signaled by the peer
} }
type connFlag int type connFlag int32
const ( const (
dynDialedConn connFlag = 1 << iota dynDialedConn connFlag = 1 << iota
@ -252,7 +253,18 @@ func (f connFlag) String() string {
} }
func (c *conn) is(f connFlag) bool { func (c *conn) is(f connFlag) bool {
return c.flags&f != 0 flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
return flags&f != 0
}
func (c *conn) set(f connFlag, val bool) {
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
if val {
flags |= f
} else {
flags &= ^f
}
atomic.StoreInt32((*int32)(&c.flags), int32(flags))
} }
// Peers returns all connected peers. // Peers returns all connected peers.
@ -632,7 +644,7 @@ running:
trusted[n.ID] = true trusted[n.ID] = true
// Mark any already-connected peer as trusted // Mark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok { if p, ok := peers[n.ID]; ok {
p.rw.flags |= trustedConn p.rw.set(trustedConn, true)
} }
case n := <-srv.removetrusted: case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode // This channel is used by RemoveTrustedPeer to remove an enode
@ -643,7 +655,7 @@ running:
} }
// Unmark any already-connected peer as trusted // Unmark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok { if p, ok := peers[n.ID]; ok {
p.rw.flags &= ^trustedConn p.rw.set(trustedConn, false)
} }
case op := <-srv.peerOp: case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount. // This channel is used by Peers and PeerCount.

View File

@ -189,12 +189,10 @@ func TestServerDial(t *testing.T) {
} }
done <- true done <- true
}() }()
// Trigger potential race conditions // Trigger potential race conditions
peer = srv.Peers()[0] peer = srv.Peers()[0]
_ = peer.Inbound() _ = peer.Inbound()
_ = peer.Info() _ = peer.Info()
<-done <-done
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second") t.Error("server did not launch peer within one second")