diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index f34935265..b7852addb 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -18,14 +18,266 @@ package simulations import ( "context" + "encoding/json" "fmt" + "strconv" + "strings" "testing" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) +// Tests that a created snapshot with a minimal service only contains the expected connections +// and that a network when loaded with this snapshot only contains those same connections +func TestSnapshot(t *testing.T) { + + // PART I + // create snapshot from ring network + + // this is a minimal service, whose protocol will take exactly one message OR close of connection before quitting + adapter := adapters.NewSimAdapter(adapters.Services{ + "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) { + return NewNoopService(nil), nil + }, + }) + + // create network + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "noopwoop", + }) + // \todo consider making a member of network, set to true threadsafe when shutdown + runningOne := true + defer func() { + if runningOne { + network.Shutdown() + } + }() + + // create and start nodes + nodeCount := 20 + ids := make([]enode.ID, nodeCount) + for i := 0; i < nodeCount; i++ { + conf := adapters.RandomNodeConfig() + node, err := network.NewNodeWithConfig(conf) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := network.Start(node.ID()); err != nil { + t.Fatalf("error starting node: %s", err) + } + ids[i] = node.ID() + } + + // subscribe to peer events + evC := make(chan *Event) + sub := network.Events().Subscribe(evC) + defer sub.Unsubscribe() + + // connect nodes in a ring + // spawn separate thread to avoid deadlock in the event listeners + go func() { + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := network.Connect(id, peerID); err != nil { + t.Fatal(err) + } + } + }() + + // collect connection events up to expected number + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + checkIds := make(map[enode.ID][]enode.ID) + connEventCount := nodeCount +OUTER: + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case ev := <-evC: + if ev.Type == EventTypeConn && !ev.Control { + + // fail on any disconnect + if !ev.Conn.Up { + t.Fatalf("unexpected disconnect: %v -> %v", ev.Conn.One, ev.Conn.Other) + } + checkIds[ev.Conn.One] = append(checkIds[ev.Conn.One], ev.Conn.Other) + checkIds[ev.Conn.Other] = append(checkIds[ev.Conn.Other], ev.Conn.One) + connEventCount-- + log.Debug("ev", "count", connEventCount) + if connEventCount == 0 { + break OUTER + } + } + } + } + + // create snapshot of current network + snap, err := network.Snapshot() + if err != nil { + t.Fatal(err) + } + j, err := json.Marshal(snap) + if err != nil { + t.Fatal(err) + } + log.Debug("snapshot taken", "nodes", len(snap.Nodes), "conns", len(snap.Conns), "json", string(j)) + + // verify that the snap element numbers check out + if len(checkIds) != len(snap.Conns) || len(checkIds) != len(snap.Nodes) { + t.Fatalf("snapshot wrong node,conn counts %d,%d != %d", len(snap.Nodes), len(snap.Conns), len(checkIds)) + } + + // shut down sim network + runningOne = false + sub.Unsubscribe() + network.Shutdown() + + // check that we have all the expected connections in the snapshot + for nodid, nodConns := range checkIds { + for _, nodConn := range nodConns { + var match bool + for _, snapConn := range snap.Conns { + if snapConn.One == nodid && snapConn.Other == nodConn { + match = true + break + } else if snapConn.Other == nodid && snapConn.One == nodConn { + match = true + break + } + } + if !match { + t.Fatalf("snapshot missing conn %v -> %v", nodid, nodConn) + } + } + } + log.Info("snapshot checked") + + // PART II + // load snapshot and verify that exactly same connections are formed + + adapter = adapters.NewSimAdapter(adapters.Services{ + "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) { + return NewNoopService(nil), nil + }, + }) + network = NewNetwork(adapter, &NetworkConfig{ + DefaultService: "noopwoop", + }) + defer func() { + network.Shutdown() + }() + + // subscribe to peer events + // every node up and conn up event will generate one additional control event + // therefore multiply the count by two + evC = make(chan *Event, (len(snap.Conns)*2)+(len(snap.Nodes)*2)) + sub = network.Events().Subscribe(evC) + defer sub.Unsubscribe() + + // load the snapshot + // spawn separate thread to avoid deadlock in the event listeners + err = network.Load(snap) + if err != nil { + t.Fatal(err) + } + + // collect connection events up to expected number + ctx, cancel = context.WithTimeout(context.TODO(), time.Second*3) + defer cancel() + + connEventCount = nodeCount + +OUTER_TWO: + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case ev := <-evC: + if ev.Type == EventTypeConn && !ev.Control { + + // fail on any disconnect + if !ev.Conn.Up { + t.Fatalf("unexpected disconnect: %v -> %v", ev.Conn.One, ev.Conn.Other) + } + log.Debug("conn", "on", ev.Conn.One, "other", ev.Conn.Other) + checkIds[ev.Conn.One] = append(checkIds[ev.Conn.One], ev.Conn.Other) + checkIds[ev.Conn.Other] = append(checkIds[ev.Conn.Other], ev.Conn.One) + connEventCount-- + log.Debug("ev", "count", connEventCount) + if connEventCount == 0 { + break OUTER_TWO + } + } + } + } + + // check that we have all expected connections in the network + for _, snapConn := range snap.Conns { + var match bool + for nodid, nodConns := range checkIds { + for _, nodConn := range nodConns { + if snapConn.One == nodid && snapConn.Other == nodConn { + match = true + break + } else if snapConn.Other == nodid && snapConn.One == nodConn { + match = true + break + } + } + } + if !match { + t.Fatalf("network missing conn %v -> %v", snapConn.One, snapConn.Other) + } + } + + // verify that network didn't generate any other additional connection events after the ones we have collected within a reasonable period of time + ctx, cancel = context.WithTimeout(context.TODO(), time.Second) + defer cancel() + select { + case <-ctx.Done(): + case ev := <-evC: + if ev.Type == EventTypeConn { + t.Fatalf("Superfluous conn found %v -> %v", ev.Conn.One, ev.Conn.Other) + } + } + + // This test validates if all connections from the snapshot + // are created in the network. + t.Run("conns after load", func(t *testing.T) { + // Create new network. + n := NewNetwork( + adapters.NewSimAdapter(adapters.Services{ + "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) { + return NewNoopService(nil), nil + }, + }), + &NetworkConfig{ + DefaultService: "noopwoop", + }, + ) + defer n.Shutdown() + + // Load the same snapshot. + err := n.Load(snap) + if err != nil { + t.Fatal(err) + } + + // Check every connection from the snapshot + // if it is in the network, too. + for _, c := range snap.Conns { + if n.GetConn(c.One, c.Other) == nil { + t.Errorf("missing connection: %s -> %s", c.One, c.Other) + } + } + }) +} + // TestNetworkSimulation creates a multi-node simulation network with each node // connected in a ring topology, checks that all nodes successfully handshake // with each other and that a snapshot fully represents the desired topology @@ -158,3 +410,78 @@ func triggerChecks(ctx context.Context, ids []enode.ID, trigger chan enode.ID, i } } } + +// \todo: refactor to implement shapshots +// and connect configuration methods once these are moved from +// swarm/network/simulations/connect.go +func BenchmarkMinimalService(b *testing.B) { + b.Run("ring/32", benchmarkMinimalServiceTmp) +} + +func benchmarkMinimalServiceTmp(b *testing.B) { + + // stop timer to discard setup time pollution + args := strings.Split(b.Name(), "/") + nodeCount, err := strconv.ParseInt(args[2], 10, 16) + if err != nil { + b.Fatal(err) + } + + for i := 0; i < b.N; i++ { + // this is a minimal service, whose protocol will close a channel upon run of protocol + // making it possible to bench the time it takes for the service to start and protocol actually to be run + protoCMap := make(map[enode.ID]map[enode.ID]chan struct{}) + adapter := adapters.NewSimAdapter(adapters.Services{ + "noopwoop": func(ctx *adapters.ServiceContext) (node.Service, error) { + protoCMap[ctx.Config.ID] = make(map[enode.ID]chan struct{}) + svc := NewNoopService(protoCMap[ctx.Config.ID]) + return svc, nil + }, + }) + + // create network + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "noopwoop", + }) + defer network.Shutdown() + + // create and start nodes + ids := make([]enode.ID, nodeCount) + for i := 0; i < int(nodeCount); i++ { + conf := adapters.RandomNodeConfig() + node, err := network.NewNodeWithConfig(conf) + if err != nil { + b.Fatalf("error creating node: %s", err) + } + if err := network.Start(node.ID()); err != nil { + b.Fatalf("error starting node: %s", err) + } + ids[i] = node.ID() + } + + // ready, set, go + b.ResetTimer() + + // connect nodes in a ring + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := network.Connect(id, peerID); err != nil { + b.Fatal(err) + } + } + + // wait for all protocols to signal to close down + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + for nodid, peers := range protoCMap { + for peerid, peerC := range peers { + log.Debug("getting ", "node", nodid, "peer", peerid) + select { + case <-ctx.Done(): + b.Fatal(ctx.Err()) + case <-peerC: + } + } + } + } +} diff --git a/swarm/network/simulation/simulation_test.go b/swarm/network/simulation/simulation_test.go index ca8599d7c..4667a2abc 100644 --- a/swarm/network/simulation/simulation_test.go +++ b/swarm/network/simulation/simulation_test.go @@ -26,9 +26,8 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" colorable "github.com/mattn/go-colorable" ) @@ -182,39 +181,23 @@ func noopServiceFunc(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, f return newNoopService(), nil, nil } -// noopService is the service that does not do anything -// but implements node.Service interface. -type noopService struct{} - func newNoopService() node.Service { return &noopService{} } -func (t *noopService) Protocols() []p2p.Protocol { - return []p2p.Protocol{} -} - -func (t *noopService) APIs() []rpc.API { - return []rpc.API{} -} - -func (t *noopService) Start(server *p2p.Server) error { - return nil -} - -func (t *noopService) Stop() error { - return nil -} - -// a helper function for most basic noop service -// of a different type then noopService to test +// a helper function for most basic Noop service +// of a different type then NoopService to test // multiple services on one node. func noopService2Func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { return new(noopService2), nil, nil } -// noopService2 is the service that does not do anything +// NoopService2 is the service that does not do anything // but implements node.Service interface. type noopService2 struct { - noopService + simulations.NoopService +} + +type noopService struct { + simulations.NoopService }