cmd/devp2p: fix flaky SameRequestID test (#22754)

This commit is contained in:
rene 2021-04-28 21:38:38 +02:00 committed by GitHub
parent 6d7c9566df
commit e4270cacf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 31 deletions

View File

@ -76,9 +76,14 @@ func (s *Suite) TestGetBlockHeaders_66(t *utesting.T) {
}, },
} }
// write message // write message
headers := s.getBlockHeaders66(t, conn, req, req.RequestId) headers, err := s.getBlockHeaders66(conn, req, req.RequestId)
if err != nil {
t.Fatalf("could not get block headers: %v", err)
}
// check for correct headers // check for correct headers
headersMatch(t, s.chain, headers) if !headersMatch(t, s.chain, headers) {
t.Fatal("received wrong header(s)")
}
} }
// TestSimultaneousRequests_66 sends two simultaneous `GetBlockHeader` requests // TestSimultaneousRequests_66 sends two simultaneous `GetBlockHeader` requests
@ -115,12 +120,25 @@ func (s *Suite) TestSimultaneousRequests_66(t *utesting.T) {
// wait for headers for first request // wait for headers for first request
headerChan := make(chan BlockHeaders, 1) headerChan := make(chan BlockHeaders, 1)
go func(headers chan BlockHeaders) { go func(headers chan BlockHeaders) {
headers <- s.getBlockHeaders66(t, conn1, req1, req1.RequestId) recvHeaders, err := s.getBlockHeaders66(conn1, req1, req1.RequestId)
if err != nil {
t.Fatalf("could not get block headers: %v", err)
return
}
headers <- recvHeaders
}(headerChan) }(headerChan)
// check headers of second request // check headers of second request
headersMatch(t, s.chain, s.getBlockHeaders66(t, conn2, req2, req2.RequestId)) headers1, err := s.getBlockHeaders66(conn2, req2, req2.RequestId)
if err != nil {
t.Fatalf("could not get block headers: %v", err)
}
if !headersMatch(t, s.chain, headers1) {
t.Fatal("wrong header(s) in response to req2")
}
// check headers of first request // check headers of first request
headersMatch(t, s.chain, <-headerChan) if !headersMatch(t, s.chain, <-headerChan) {
t.Fatal("wrong header(s) in response to req1")
}
} }
// TestBroadcast_66 tests whether a block announcement is correctly // TestBroadcast_66 tests whether a block announcement is correctly
@ -377,26 +395,31 @@ func (s *Suite) TestZeroRequestID_66(t *utesting.T) {
Amount: 2, Amount: 2,
}, },
} }
headersMatch(t, s.chain, s.getBlockHeaders66(t, conn, req, req.RequestId)) headers, err := s.getBlockHeaders66(conn, req, req.RequestId)
if err != nil {
t.Fatalf("could not get block headers: %v", err)
}
if !headersMatch(t, s.chain, headers) {
t.Fatal("received wrong header(s)")
}
} }
// TestSameRequestID_66 sends two requests with the same request ID // TestSameRequestID_66 sends two requests with the same request ID
// concurrently to a single node. // concurrently to a single node.
func (s *Suite) TestSameRequestID_66(t *utesting.T) { func (s *Suite) TestSameRequestID_66(t *utesting.T) {
conn := s.setupConnection66(t) conn := s.setupConnection66(t)
defer conn.Close() // create two requests with the same request ID
// create two separate requests with same ID
reqID := uint64(1234) reqID := uint64(1234)
req1 := &eth.GetBlockHeadersPacket66{ request1 := &eth.GetBlockHeadersPacket66{
RequestId: reqID, RequestId: reqID,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{ GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{ Origin: eth.HashOrNumber{
Number: 0, Number: 1,
}, },
Amount: 2, Amount: 2,
}, },
} }
req2 := &eth.GetBlockHeadersPacket66{ request2 := &eth.GetBlockHeadersPacket66{
RequestId: reqID, RequestId: reqID,
GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{ GetBlockHeadersPacket: &eth.GetBlockHeadersPacket{
Origin: eth.HashOrNumber{ Origin: eth.HashOrNumber{
@ -405,12 +428,26 @@ func (s *Suite) TestSameRequestID_66(t *utesting.T) {
Amount: 2, Amount: 2,
}, },
} }
// send requests concurrently // write the first request
go func() { err := conn.write66(request1, GetBlockHeaders{}.Code())
headersMatch(t, s.chain, s.getBlockHeaders66(t, conn, req2, reqID)) if err != nil {
}() t.Fatalf("could not write to connection: %v", err)
// check response from first request }
headersMatch(t, s.chain, s.getBlockHeaders66(t, conn, req1, reqID)) // perform second request
headers2, err := s.getBlockHeaders66(conn, request2, reqID)
if err != nil {
t.Fatalf("could not get block headers: %v", err)
return
}
// wait for response to first request
headers1, err := s.waitForBlockHeadersResponse66(conn, reqID)
if err != nil {
t.Fatalf("could not get BlockHeaders response: %v", err)
}
// check if headers match
if !headersMatch(t, s.chain, headers1) || !headersMatch(t, s.chain, headers2) {
t.Fatal("received wrong header(s)")
}
} }
// TestLargeTxRequest_66 tests whether a node can fulfill a large GetPooledTransactions // TestLargeTxRequest_66 tests whether a node can fulfill a large GetPooledTransactions

View File

@ -18,6 +18,7 @@ package ethtest
import ( import (
"fmt" "fmt"
"reflect"
"time" "time"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -150,8 +151,7 @@ func (c *Conn) waitForResponse(chain *Chain, timeout time.Duration, requestID ui
func (c *Conn) readAndServe66(chain *Chain, timeout time.Duration) (uint64, Message) { func (c *Conn) readAndServe66(chain *Chain, timeout time.Duration) (uint64, Message) {
start := time.Now() start := time.Now()
for time.Since(start) < timeout { for time.Since(start) < timeout {
timeout := time.Now().Add(10 * time.Second) c.SetReadDeadline(time.Now().Add(10 * time.Second))
c.SetReadDeadline(timeout)
reqID, msg := c.read66() reqID, msg := c.read66()
@ -257,6 +257,9 @@ func (c *Conn) waitForBlock66(block *types.Block) error {
return nil return nil
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
case *NewPooledTransactionHashes:
// ignore old announcements
continue
default: default:
return fmt.Errorf("invalid message: %s", pretty.Sdump(msg)) return fmt.Errorf("invalid message: %s", pretty.Sdump(msg))
} }
@ -269,32 +272,39 @@ func sendSuccessfulTx66(t *utesting.T, s *Suite, tx *types.Transaction) {
sendSuccessfulTxWithConn(t, s, tx, sendConn) sendSuccessfulTxWithConn(t, s, tx, sendConn)
} }
func (s *Suite) getBlockHeaders66(t *utesting.T, conn *Conn, req eth.Packet, expectedID uint64) BlockHeaders { // waitForBlockHeadersResponse66 waits for a BlockHeaders message with the given expected request ID
if err := conn.write66(req, GetBlockHeaders{}.Code()); err != nil { func (s *Suite) waitForBlockHeadersResponse66(conn *Conn, expectedID uint64) (BlockHeaders, error) {
t.Fatalf("could not write to connection: %v", err)
}
// check block headers response
reqID, msg := conn.readAndServe66(s.chain, timeout) reqID, msg := conn.readAndServe66(s.chain, timeout)
switch msg := msg.(type) { switch msg := msg.(type) {
case BlockHeaders: case BlockHeaders:
if reqID != expectedID { if reqID != expectedID {
t.Fatalf("request ID mismatch: wanted %d, got %d", expectedID, reqID) return nil, fmt.Errorf("request ID mismatch: wanted %d, got %d", expectedID, reqID)
} }
return msg return msg, nil
default: default:
t.Fatalf("unexpected: %s", pretty.Sdump(msg)) return nil, fmt.Errorf("unexpected: %s", pretty.Sdump(msg))
return nil
} }
} }
func headersMatch(t *utesting.T, chain *Chain, headers BlockHeaders) { func (s *Suite) getBlockHeaders66(conn *Conn, req eth.Packet, expectedID uint64) (BlockHeaders, error) {
if err := conn.write66(req, GetBlockHeaders{}.Code()); err != nil {
return nil, fmt.Errorf("could not write to connection: %v", err)
}
return s.waitForBlockHeadersResponse66(conn, expectedID)
}
func headersMatch(t *utesting.T, chain *Chain, headers BlockHeaders) bool {
mismatched := 0
for _, header := range headers { for _, header := range headers {
num := header.Number.Uint64() num := header.Number.Uint64()
t.Logf("received header (%d): %s", num, pretty.Sdump(header.Hash())) t.Logf("received header (%d): %s", num, pretty.Sdump(header.Hash()))
assert.Equal(t, chain.blocks[int(num)].Header(), header) if !reflect.DeepEqual(chain.blocks[int(num)].Header(), header) {
mismatched += 1
t.Logf("received wrong header: %v", pretty.Sdump(header))
} }
} }
return mismatched == 0
}
func (s *Suite) sendNextBlock66(t *utesting.T) { func (s *Suite) sendNextBlock66(t *utesting.T) {
sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t) sendConn, receiveConn := s.setupConnection66(t), s.setupConnection66(t)