swarm/network: fix data races in TestInitialPeersMsg test (#19490)

* swarm/network: fix data races in TestInitialPeersMsg test

* swarm/network: add Kademlia.Saturation method with lock

* swarm/network: add Hive.Peer method to safely retrieve a bzz peer

* swarm/network: remove duplicate comment

* p2p/testing: prevent goroutine leak in ProtocolTester

* swarm/network: fix data race in newBzzBaseTesterWithAddrs

* swarm/network: fix goroutone leaks in testInitialPeersMsg

* swarm/network: raise number of peer check attempts in testInitialPeersMsg

* swarm/network: use Hive.Peer in Hive.PeerInfo function

* swarm/network: reduce the scope of mutex lock in newBzzBaseTesterWithAddrs

* swarm/storage: disable TestCleanIndex with race detector
This commit is contained in:
Janoš Guljaš 2019-04-25 21:33:18 +02:00 committed by Viktor Trón
parent 92a849a509
commit 3873a7314d
6 changed files with 37 additions and 10 deletions

View File

@ -110,6 +110,7 @@ func NewProtocolTester(prvkey *ecdsa.PrivateKey, nodeCount int, run func(*p2p.Pe
// Stop stops the p2p server // Stop stops the p2p server
func (t *ProtocolTester) Stop() { func (t *ProtocolTester) Stop() {
t.Server.Stop() t.Server.Stop()
t.network.Shutdown()
} }
// Connect brings up the remote peer node and connects it using the // Connect brings up the remote peer node and connects it using the

View File

@ -154,13 +154,15 @@ func testInitialPeersMsg(t *testing.T, peerPO, peerDepth int) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer s.Stop()
// peerID to use in the protocol tester testExchange expect/trigger // peerID to use in the protocol tester testExchange expect/trigger
peerID := s.Nodes[0].ID() peerID := s.Nodes[0].ID()
// block until control peer is found among hive peers // block until control peer is found among hive peers
found := false found := false
for attempts := 0; attempts < 20; attempts++ { for attempts := 0; attempts < 2000; attempts++ {
if _, found = hive.peers[peerID]; found { found = hive.Peer(peerID) != nil
if found {
break break
} }
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
@ -171,7 +173,7 @@ func testInitialPeersMsg(t *testing.T, peerPO, peerDepth int) {
} }
// pivotDepth is the advertised depth of the pivot node we expect in the outgoing subPeersMsg // pivotDepth is the advertised depth of the pivot node we expect in the outgoing subPeersMsg
pivotDepth := hive.saturation() pivotDepth := hive.Saturation()
// the test exchange is as follows: // the test exchange is as follows:
// 1. pivot sends to the control peer a `subPeersMsg` advertising its depth (ignored) // 1. pivot sends to the control peer a `subPeersMsg` advertising its depth (ignored)
// 2. peer sends to pivot a `subPeersMsg` advertising its own depth (arbitrarily chosen) // 2. peer sends to pivot a `subPeersMsg` advertising its own depth (arbitrarily chosen)

View File

@ -192,9 +192,7 @@ func (h *Hive) NodeInfo() interface{} {
// PeerInfo function is used by the p2p.server RPC interface to display // PeerInfo function is used by the p2p.server RPC interface to display
// protocol specific information any connected peer referred to by their NodeID // protocol specific information any connected peer referred to by their NodeID
func (h *Hive) PeerInfo(id enode.ID) interface{} { func (h *Hive) PeerInfo(id enode.ID) interface{} {
h.lock.Lock() p := h.Peer(id)
p := h.peers[id]
h.lock.Unlock()
if p == nil { if p == nil {
return nil return nil
@ -209,6 +207,15 @@ func (h *Hive) PeerInfo(id enode.ID) interface{} {
} }
} }
// Peer returns a bzz peer from the Hive. If there is no peer
// with the provided enode id, a nil value is returned.
func (h *Hive) Peer(id enode.ID) *BzzPeer {
h.lock.Lock()
defer h.lock.Unlock()
return h.peers[id]
}
// loadPeers, savePeer implement persistence callback/ // loadPeers, savePeer implement persistence callback/
func (h *Hive) loadPeers() error { func (h *Hive) loadPeers() error {
var as []*BzzAddr var as []*BzzAddr

View File

@ -735,8 +735,15 @@ func NewPeerPotMap(neighbourhoodSize int, addrs [][]byte) map[string]*PeerPot {
return ppmap return ppmap
} }
// saturation returns the smallest po value in which the node has less than MinBinSize peers // Saturation returns the smallest po value in which the node has less than MinBinSize peers
// if the iterator reaches neighbourhood radius, then the last bin + 1 is returned // if the iterator reaches neighbourhood radius, then the last bin + 1 is returned
func (k *Kademlia) Saturation() int {
k.lock.RLock()
defer k.lock.RUnlock()
return k.saturation()
}
func (k *Kademlia) saturation() int { func (k *Kademlia) saturation() int {
prev := -1 prev := -1
radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base)

View File

@ -86,9 +86,12 @@ func newBzzBaseTester(n int, prvkey *ecdsa.PrivateKey, spec *protocols.Spec, run
func newBzzBaseTesterWithAddrs(prvkey *ecdsa.PrivateKey, addrs [][]byte, spec *protocols.Spec, run func(*BzzPeer) error) (*bzzTester, [][]byte, error) { func newBzzBaseTesterWithAddrs(prvkey *ecdsa.PrivateKey, addrs [][]byte, spec *protocols.Spec, run func(*BzzPeer) error) (*bzzTester, [][]byte, error) {
n := len(addrs) n := len(addrs)
cs := make(map[enode.ID]chan bool) cs := make(map[enode.ID]chan bool)
var csMu sync.Mutex
srv := func(p *BzzPeer) error { srv := func(p *BzzPeer) error {
defer func() { defer func() {
csMu.Lock()
defer csMu.Unlock()
if cs[p.ID()] != nil { if cs[p.ID()] != nil {
close(cs[p.ID()]) close(cs[p.ID()])
} }
@ -99,8 +102,8 @@ func newBzzBaseTesterWithAddrs(prvkey *ecdsa.PrivateKey, addrs [][]byte, spec *p
nodeToAddr := make(map[enode.ID][]byte) nodeToAddr := make(map[enode.ID][]byte)
protocol := func(p *p2p.Peer, rw p2p.MsgReadWriter) error { protocol := func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
mu.Lock() mu.Lock()
defer mu.Unlock()
nodeToAddr[p.ID()] = addrs[0] nodeToAddr[p.ID()] = addrs[0]
mu.Unlock()
bzzAddr := &BzzAddr{addrs[0], []byte(p.Node().String())} bzzAddr := &BzzAddr{addrs[0], []byte(p.Node().String())}
addrs = addrs[1:] addrs = addrs[1:]
return srv(&BzzPeer{Peer: protocols.NewPeer(p, rw, spec), BzzAddr: bzzAddr}) return srv(&BzzPeer{Peer: protocols.NewPeer(p, rw, spec), BzzAddr: bzzAddr})
@ -120,10 +123,12 @@ func newBzzBaseTesterWithAddrs(prvkey *ecdsa.PrivateKey, addrs [][]byte, spec *p
} }
addr := getENRBzzAddr(nod) addr := getENRBzzAddr(nod)
csMu.Lock()
for _, node := range s.Nodes { for _, node := range s.Nodes {
log.Warn("node", "node", node) log.Warn("node", "node", node)
cs[node.ID()] = make(chan bool) cs[node.ID()] = make(chan bool)
} }
csMu.Unlock()
var nodeAddrs [][]byte var nodeAddrs [][]byte
pt := &bzzTester{ pt := &bzzTester{
@ -131,9 +136,11 @@ func newBzzBaseTesterWithAddrs(prvkey *ecdsa.PrivateKey, addrs [][]byte, spec *p
ProtocolTester: s, ProtocolTester: s,
cs: cs, cs: cs,
} }
mu.Lock()
for _, n := range pt.Nodes { for _, n := range pt.Nodes {
nodeAddrs = append(nodeAddrs, nodeToAddr[n.ID()]) nodeAddrs = append(nodeAddrs, nodeToAddr[n.ID()])
} }
mu.Unlock()
return pt, nodeAddrs, nil return pt, nodeAddrs, nil
} }

View File

@ -27,12 +27,11 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/ethereum/go-ethereum/swarm/testutil"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock/mem" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
"github.com/ethereum/go-ethereum/swarm/testutil"
ldberrors "github.com/syndtr/goleveldb/leveldb/errors" ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
) )
@ -606,6 +605,10 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
} }
func TestCleanIndex(t *testing.T) { func TestCleanIndex(t *testing.T) {
if testutil.RaceEnabled {
t.Skip("disabled because it times out with race detector")
}
capacity := 5000 capacity := 5000
n := 3 n := 3