forked from cerc-io/plugeth
p2p/protocols: fix data race in TestProtocolHook (#19242)
dummyHook's fields were concurrently written by nodes and read by
the test. The simplest solution is to protect all fields with a mutex.
Enable: TestMultiplePeersDropSelf, TestMultiplePeersDropOther as they
seemingly accidentally stayed disabled during a refactor/rewrite
since 1836366ac1
.
resolves ethersphere/go-ethereum#1286
This commit is contained in:
parent
bb55b0fb53
commit
f82185a4a1
@ -161,7 +161,7 @@ func TestAccountingSimulation(t *testing.T) {
|
|||||||
type matrix struct {
|
type matrix struct {
|
||||||
n int //number of nodes
|
n int //number of nodes
|
||||||
m []int64 //array of balances
|
m []int64 //array of balances
|
||||||
lock sync.RWMutex
|
lock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new matrix
|
// create a new matrix
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -172,8 +173,11 @@ func protoHandshakeExchange(id enode.ID, proto *protoHandshake) []p2ptest.Exchan
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
|
func runProtoHandshake(t *testing.T, proto *protoHandshake, errs ...error) {
|
||||||
|
t.Helper()
|
||||||
pp := p2ptest.NewTestPeerPool()
|
pp := p2ptest.NewTestPeerPool()
|
||||||
s := protocolTester(pp)
|
s := protocolTester(pp)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
// TODO: make this more than one handshake
|
// TODO: make this more than one handshake
|
||||||
node := s.Nodes[0]
|
node := s.Nodes[0]
|
||||||
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
|
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), proto)...); err != nil {
|
||||||
@ -195,6 +199,7 @@ type dummyHook struct {
|
|||||||
send bool
|
send bool
|
||||||
err error
|
err error
|
||||||
waitC chan struct{}
|
waitC chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyMsg struct {
|
type dummyMsg struct {
|
||||||
@ -202,6 +207,9 @@ type dummyMsg struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
|
func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
d.peer = peer
|
d.peer = peer
|
||||||
d.size = size
|
d.size = size
|
||||||
d.msg = msg
|
d.msg = msg
|
||||||
@ -210,6 +218,9 @@ func (d *dummyHook) Send(peer *Peer, size uint32, msg interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
|
func (d *dummyHook) Receive(peer *Peer, size uint32, msg interface{}) error {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
d.peer = peer
|
d.peer = peer
|
||||||
d.size = size
|
d.size = size
|
||||||
d.msg = msg
|
d.msg = msg
|
||||||
@ -263,6 +274,7 @@ func TestProtocolHook(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
testHook.mu.Lock()
|
||||||
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
|
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "handshake" {
|
||||||
t.Fatal("Expected msg to be set, but it is not")
|
t.Fatal("Expected msg to be set, but it is not")
|
||||||
}
|
}
|
||||||
@ -278,6 +290,7 @@ func TestProtocolHook(t *testing.T) {
|
|||||||
if testHook.size != 11 { //11 is the length of the encoded message
|
if testHook.size != 11 { //11 is the length of the encoded message
|
||||||
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
|
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
|
||||||
}
|
}
|
||||||
|
testHook.mu.Unlock()
|
||||||
|
|
||||||
err = tester.TestExchanges(p2ptest.Exchange{
|
err = tester.TestExchanges(p2ptest.Exchange{
|
||||||
Triggers: []p2ptest.Trigger{
|
Triggers: []p2ptest.Trigger{
|
||||||
@ -294,6 +307,8 @@ func TestProtocolHook(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testHook.mu.Lock()
|
||||||
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
|
if testHook.msg == nil || testHook.msg.(*dummyMsg).Content != "response" {
|
||||||
t.Fatal("Expected msg to be set, but it is not")
|
t.Fatal("Expected msg to be set, but it is not")
|
||||||
}
|
}
|
||||||
@ -306,6 +321,7 @@ func TestProtocolHook(t *testing.T) {
|
|||||||
if testHook.size != 10 { //11 is the length of the encoded message
|
if testHook.size != 10 { //11 is the length of the encoded message
|
||||||
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
|
t.Fatalf("Expected size to be %d, but it is %d ", 1, testHook.size)
|
||||||
}
|
}
|
||||||
|
testHook.mu.Unlock()
|
||||||
|
|
||||||
testHook.err = fmt.Errorf("dummy error")
|
testHook.err = fmt.Errorf("dummy error")
|
||||||
err = tester.TestExchanges(p2ptest.Exchange{
|
err = tester.TestExchanges(p2ptest.Exchange{
|
||||||
@ -325,7 +341,6 @@ func TestProtocolHook(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
|
t.Fatalf("Expected a specific disconnect error, but got different one: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//We need to test that if the hook is not defined, then message infrastructure
|
//We need to test that if the hook is not defined, then message infrastructure
|
||||||
@ -342,16 +357,19 @@ func TestNoHook(t *testing.T) {
|
|||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
msg := &perBytesMsgSenderPays{Content: "testBalance"}
|
msg := &perBytesMsgSenderPays{Content: "testBalance"}
|
||||||
//send a message
|
//send a message
|
||||||
err := peer.Send(ctx, msg)
|
|
||||||
if err != nil {
|
if err := peer.Send(ctx, msg); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
//simulate receiving a message
|
//simulate receiving a message
|
||||||
rw.msg = msg
|
rw.msg = msg
|
||||||
peer.handleIncoming(func(ctx context.Context, msg interface{}) error {
|
handler := func(ctx context.Context, msg interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
}
|
||||||
//all should just work and not result in any error
|
|
||||||
|
if err := peer.handleIncoming(handler); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProtoHandshakeVersionMismatch(t *testing.T) {
|
func TestProtoHandshakeVersionMismatch(t *testing.T) {
|
||||||
@ -391,8 +409,11 @@ func moduleHandshakeExchange(id enode.ID, resp uint) []p2ptest.Exchange {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
|
func runModuleHandshake(t *testing.T, resp uint, errs ...error) {
|
||||||
|
t.Helper()
|
||||||
pp := p2ptest.NewTestPeerPool()
|
pp := p2ptest.NewTestPeerPool()
|
||||||
s := protocolTester(pp)
|
s := protocolTester(pp)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
node := s.Nodes[0]
|
node := s.Nodes[0]
|
||||||
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
|
if err := s.TestExchanges(protoHandshakeExchange(node.ID(), &protoHandshake{42, "420"})...); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -471,8 +492,10 @@ func testMultiPeerSetup(a, b enode.ID) []p2ptest.Exchange {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runMultiplePeers(t *testing.T, peer int, errs ...error) {
|
func runMultiplePeers(t *testing.T, peer int, errs ...error) {
|
||||||
|
t.Helper()
|
||||||
pp := p2ptest.NewTestPeerPool()
|
pp := p2ptest.NewTestPeerPool()
|
||||||
s := protocolTester(pp)
|
s := protocolTester(pp)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
|
if err := s.TestExchanges(testMultiPeerSetup(s.Nodes[0].ID(), s.Nodes[1].ID())...); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -542,14 +565,14 @@ WAIT:
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
func XTestMultiplePeersDropSelf(t *testing.T) {
|
func TestMultiplePeersDropSelf(t *testing.T) {
|
||||||
runMultiplePeers(t, 0,
|
runMultiplePeers(t, 0,
|
||||||
fmt.Errorf("subprotocol error"),
|
fmt.Errorf("subprotocol error"),
|
||||||
fmt.Errorf("Message handler error: (msg code 3): dropped"),
|
fmt.Errorf("Message handler error: (msg code 3): dropped"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func XTestMultiplePeersDropOther(t *testing.T) {
|
func TestMultiplePeersDropOther(t *testing.T) {
|
||||||
runMultiplePeers(t, 1,
|
runMultiplePeers(t, 1,
|
||||||
fmt.Errorf("Message handler error: (msg code 3): dropped"),
|
fmt.Errorf("Message handler error: (msg code 3): dropped"),
|
||||||
fmt.Errorf("subprotocol error"),
|
fmt.Errorf("subprotocol error"),
|
||||||
|
@ -51,7 +51,7 @@ type ProtocolTester struct {
|
|||||||
// NewProtocolTester constructs a new ProtocolTester
|
// NewProtocolTester constructs a new ProtocolTester
|
||||||
// it takes as argument the pivot node id, the number of dummy peers and the
|
// it takes as argument the pivot node id, the number of dummy peers and the
|
||||||
// protocol run function called on a peer connection by the p2p server
|
// protocol run function called on a peer connection by the p2p server
|
||||||
func NewProtocolTester(id enode.ID, n int, run func(*p2p.Peer, p2p.MsgReadWriter) error) *ProtocolTester {
|
func NewProtocolTester(id enode.ID, nodeCount int, run func(*p2p.Peer, p2p.MsgReadWriter) error) *ProtocolTester {
|
||||||
services := adapters.Services{
|
services := adapters.Services{
|
||||||
"test": func(ctx *adapters.ServiceContext) (node.Service, error) {
|
"test": func(ctx *adapters.ServiceContext) (node.Service, error) {
|
||||||
return &testNode{run}, nil
|
return &testNode{run}, nil
|
||||||
@ -74,9 +74,9 @@ func NewProtocolTester(id enode.ID, n int, run func(*p2p.Peer, p2p.MsgReadWriter
|
|||||||
}
|
}
|
||||||
|
|
||||||
node := net.GetNode(id).Node.(*adapters.SimNode)
|
node := net.GetNode(id).Node.(*adapters.SimNode)
|
||||||
peers := make([]*adapters.NodeConfig, n)
|
peers := make([]*adapters.NodeConfig, nodeCount)
|
||||||
nodes := make([]*enode.Node, n)
|
nodes := make([]*enode.Node, nodeCount)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < nodeCount; i++ {
|
||||||
peers[i] = adapters.RandomNodeConfig()
|
peers[i] = adapters.RandomNodeConfig()
|
||||||
peers[i].Services = []string{"mock"}
|
peers[i].Services = []string{"mock"}
|
||||||
nodes[i] = peers[i].Node()
|
nodes[i] = peers[i].Node()
|
||||||
@ -100,9 +100,8 @@ func NewProtocolTester(id enode.ID, n int, run func(*p2p.Peer, p2p.MsgReadWriter
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the p2p server
|
// Stop stops the p2p server
|
||||||
func (t *ProtocolTester) Stop() error {
|
func (t *ProtocolTester) Stop() {
|
||||||
t.Server.Stop()
|
t.Server.Stop()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect brings up the remote peer node and connects it using the
|
// Connect brings up the remote peer node and connects it using the
|
||||||
|
Loading…
Reference in New Issue
Block a user