diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index 0dc2ee21d..7c6ec9462 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -19,7 +19,6 @@ package adapters import ( "bytes" "context" - "crypto/ecdsa" "encoding/json" "errors" "fmt" @@ -146,7 +145,6 @@ type ExecNode struct { client *rpc.Client wsAddr string newCmd func() *exec.Cmd - key *ecdsa.PrivateKey } // Addr returns the node's enode URL diff --git a/p2p/simulations/adapters/inproc_test.go b/p2p/simulations/adapters/inproc_test.go index bd2c70b05..2a61508fe 100644 --- a/p2p/simulations/adapters/inproc_test.go +++ b/p2p/simulations/adapters/inproc_test.go @@ -20,8 +20,8 @@ import ( "bytes" "encoding/binary" "fmt" + "sync" "testing" - "time" "github.com/ethereum/go-ethereum/p2p/simulations/pipes" ) @@ -32,42 +32,26 @@ func TestTCPPipe(t *testing.T) { t.Fatal(err) } - done := make(chan struct{}) - - go func() { - msgs := 50 - size := 1024 - for i := 0; i < msgs; i++ { - msg := make([]byte, size) - _ = binary.PutUvarint(msg, uint64(i)) - - _, err := c1.Write(msg) - if err != nil { - t.Fatal(err) - } + msgs := 50 + size := 1024 + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + binary.PutUvarint(msg, uint64(i)) + if _, err := c1.Write(msg); err != nil { + t.Fatal(err) } + } - for i := 0; i < msgs; i++ { - msg := make([]byte, size) - _ = binary.PutUvarint(msg, uint64(i)) - - out := make([]byte, size) - _, err := c2.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(msg, out) { - t.Fatalf("expected %#v, got %#v", msg, out) - } + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + binary.PutUvarint(msg, uint64(i)) + out := make([]byte, size) + if _, err := c2.Read(out); err != nil { + t.Fatal(err) + } + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) } - done <- struct{}{} - }() - - select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatal("test timeout") } } @@ -77,60 +61,41 @@ func TestTCPPipeBidirections(t *testing.T) { t.Fatal(err) } - done := make(chan struct{}) + msgs := 50 + size := 7 + for i := 0; i < msgs; i++ { + msg := []byte(fmt.Sprintf("ping %02d", i)) + if _, err := c1.Write(msg); err != nil { + t.Fatal(err) + } + } - go func() { - msgs := 50 - size := 7 - for i := 0; i < msgs; i++ { - msg := []byte(fmt.Sprintf("ping %02d", i)) + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("ping %02d", i)) + out := make([]byte, size) + if _, err := c2.Read(out); err != nil { + t.Fatal(err) + } - _, err := c1.Write(msg) - if err != nil { + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) + } else { + msg := []byte(fmt.Sprintf("pong %02d", i)) + if _, err := c2.Write(msg); err != nil { t.Fatal(err) } } + } - for i := 0; i < msgs; i++ { - expected := []byte(fmt.Sprintf("ping %02d", i)) - - out := make([]byte, size) - _, err := c2.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, out) { - t.Fatalf("expected %#v, got %#v", out, expected) - } else { - msg := []byte(fmt.Sprintf("pong %02d", i)) - _, err := c2.Write(msg) - if err != nil { - t.Fatal(err) - } - } + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("pong %02d", i)) + out := make([]byte, size) + if _, err := c1.Read(out); err != nil { + t.Fatal(err) } - - for i := 0; i < msgs; i++ { - expected := []byte(fmt.Sprintf("pong %02d", i)) - - out := make([]byte, size) - _, err := c1.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, out) { - t.Fatalf("expected %#v, got %#v", out, expected) - } + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) } - done <- struct{}{} - }() - - select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatal("test timeout") } } @@ -140,46 +105,35 @@ func TestNetPipe(t *testing.T) { t.Fatal(err) } - done := make(chan struct{}) + msgs := 50 + size := 1024 + var wg sync.WaitGroup + defer wg.Wait() + // netPipe is blocking, so writes are emitted asynchronously + wg.Add(1) go func() { - msgs := 50 - size := 1024 - // netPipe is blocking, so writes are emitted asynchronously - go func() { - for i := 0; i < msgs; i++ { - msg := make([]byte, size) - _ = binary.PutUvarint(msg, uint64(i)) - - _, err := c1.Write(msg) - if err != nil { - t.Fatal(err) - } - } - }() + defer wg.Done() for i := 0; i < msgs; i++ { msg := make([]byte, size) - _ = binary.PutUvarint(msg, uint64(i)) - - out := make([]byte, size) - _, err := c2.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(msg, out) { - t.Fatalf("expected %#v, got %#v", msg, out) + binary.PutUvarint(msg, uint64(i)) + if _, err := c1.Write(msg); err != nil { + t.Error(err) } } - - done <- struct{}{} }() - select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatal("test timeout") + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + binary.PutUvarint(msg, uint64(i)) + out := make([]byte, size) + if _, err := c2.Read(out); err != nil { + t.Error(err) + } + if !bytes.Equal(msg, out) { + t.Errorf("expected %#v, got %#v", msg, out) + } } } @@ -189,71 +143,60 @@ func TestNetPipeBidirections(t *testing.T) { t.Fatal(err) } - done := make(chan struct{}) + msgs := 1000 + size := 8 + pingTemplate := "ping %03d" + pongTemplate := "pong %03d" + var wg sync.WaitGroup + defer wg.Wait() + // netPipe is blocking, so writes are emitted asynchronously + wg.Add(1) go func() { - msgs := 1000 - size := 8 - pingTemplate := "ping %03d" - pongTemplate := "pong %03d" + defer wg.Done() - // netPipe is blocking, so writes are emitted asynchronously - go func() { - for i := 0; i < msgs; i++ { - msg := []byte(fmt.Sprintf(pingTemplate, i)) - - _, err := c1.Write(msg) - if err != nil { - t.Fatal(err) - } - } - }() - - // netPipe is blocking, so reads for pong are emitted asynchronously - go func() { - for i := 0; i < msgs; i++ { - expected := []byte(fmt.Sprintf(pongTemplate, i)) - - out := make([]byte, size) - _, err := c1.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, out) { - t.Fatalf("expected %#v, got %#v", expected, out) - } - } - - done <- struct{}{} - }() - - // expect to read pings, and respond with pongs to the alternate connection for i := 0; i < msgs; i++ { - expected := []byte(fmt.Sprintf(pingTemplate, i)) - - out := make([]byte, size) - _, err := c2.Read(out) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(expected, out) { - t.Fatalf("expected %#v, got %#v", expected, out) - } else { - msg := []byte(fmt.Sprintf(pongTemplate, i)) - - _, err := c2.Write(msg) - if err != nil { - t.Fatal(err) - } + msg := []byte(fmt.Sprintf(pingTemplate, i)) + if _, err := c1.Write(msg); err != nil { + t.Error(err) } } }() - select { - case <-done: - case <-time.After(5 * time.Second): - t.Fatal("test timeout") + // netPipe is blocking, so reads for pong are emitted asynchronously + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pongTemplate, i)) + out := make([]byte, size) + if _, err := c1.Read(out); err != nil { + t.Error(err) + } + if !bytes.Equal(expected, out) { + t.Errorf("expected %#v, got %#v", expected, out) + } + } + }() + + // expect to read pings, and respond with pongs to the alternate connection + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pingTemplate, i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Errorf("expected %#v, got %#v", expected, out) + } else { + msg := []byte(fmt.Sprintf(pongTemplate, i)) + if _, err := c2.Write(msg); err != nil { + t.Fatal(err) + } + } } } diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 3f46baa7d..51d45ce8f 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -384,12 +384,6 @@ func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { sub := s.network.events.Subscribe(events) defer sub.Unsubscribe() - // stop the stream if the client goes away - var clientGone <-chan bool - if cn, ok := w.(http.CloseNotifier); ok { - clientGone = cn.CloseNotify() - } - // write writes the given event and data to the stream like: // // event: @@ -455,6 +449,7 @@ func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { } } + clientGone := req.Context().Done() for { select { case event := <-events: @@ -710,7 +705,7 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") - ctx := context.Background() + ctx := req.Context() if id := params.ByName("nodeid"); id != "" { var nodeID enode.ID diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go index 069040257..56d81942b 100644 --- a/p2p/simulations/mocker_test.go +++ b/p2p/simulations/mocker_test.go @@ -80,14 +80,17 @@ func TestMocker(t *testing.T) { var opts SubscribeOpts sub, err := client.SubscribeNetwork(events, opts) defer sub.Unsubscribe() - //wait until all nodes are started and connected - //store every node up event in a map (value is irrelevant, mimic Set datatype) + + // wait until all nodes are started and connected + // store every node up event in a map (value is irrelevant, mimic Set datatype) nodemap := make(map[enode.ID]bool) - wg.Add(1) nodesComplete := false connCount := 0 + wg.Add(1) go func() { - for { + defer wg.Done() + + for connCount < (nodeCount-1)*2 { select { case event := <-events: if isNodeUp(event) { @@ -99,14 +102,10 @@ func TestMocker(t *testing.T) { } } else if event.Conn != nil && nodesComplete { connCount += 1 - if connCount == (nodeCount-1)*2 { - wg.Done() - return - } } case <-time.After(30 * time.Second): - wg.Done() - t.Fatalf("Timeout waiting for nodes being started up!") + t.Errorf("Timeout waiting for nodes being started up!") + return } } }() diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index d0cb59203..ac1b06a80 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -81,11 +81,13 @@ func TestSnapshot(t *testing.T) { // connect nodes in a ring // spawn separate thread to avoid deadlock in the event listeners + connectErr := make(chan error, 1) go func() { for i, id := range ids { peerID := ids[(i+1)%len(ids)] if err := network.Connect(id, peerID); err != nil { - t.Fatal(err) + connectErr <- err + return } } }() @@ -100,9 +102,10 @@ OUTER: select { case <-ctx.Done(): t.Fatal(ctx.Err()) + case err := <-connectErr: + t.Fatal(err) case ev := <-evC: if ev.Type == EventTypeConn && !ev.Control { - // fail on any disconnect if !ev.Conn.Up { t.Fatalf("unexpected disconnect: %v -> %v", ev.Conn.One, ev.Conn.Other)