diff --git a/swarm/network/simulation/kademlia.go b/swarm/network/simulation/kademlia.go index 4b880aa0c..00e870a07 100644 --- a/swarm/network/simulation/kademlia.go +++ b/swarm/network/simulation/kademlia.go @@ -103,7 +103,7 @@ func (s *Simulation) kademlias() (ks map[enode.ID]*network.Kademlia) { // in the snapshot are registered in the kademlia. // It differs from WaitTillHealthy, which waits only until all the kademlias are // healthy (it might happen even before all the connections are established). -func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap simulations.Snapshot) error { +func (s *Simulation) WaitTillSnapshotRecreated(ctx context.Context, snap *simulations.Snapshot) error { expected := getSnapshotConnections(snap.Conns) ticker := time.NewTicker(150 * time.Millisecond) defer ticker.Stop() diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go index 9cbc39da5..0ac1e7803 100644 --- a/swarm/network/simulation/kademlia_test.go +++ b/swarm/network/simulation/kademlia_test.go @@ -182,7 +182,7 @@ func TestWaitTillSnapshotRecreated(t *testing.T) { if err != nil { t.Fatal(err) } - err = controlSim.WaitTillSnapshotRecreated(ctx, *snap) + err = controlSim.WaitTillSnapshotRecreated(ctx, snap) if err != nil { t.Fatal(err) } diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go index 2d618a29d..e24dab21b 100644 --- a/swarm/network/simulation/node.go +++ b/swarm/network/simulation/node.go @@ -17,6 +17,7 @@ package simulation import ( + "context" "encoding/json" "errors" "io/ioutil" @@ -24,7 +25,6 @@ import ( "os" "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" @@ -217,30 +217,24 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i // UploadSnapshot uploads a snapshot to the simulation // This method tries to open the json file provided, applies the config to all nodes // and then loads the snapshot into the Simulation network -func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error { +func (s *Simulation) UploadSnapshot(ctx context.Context, snapshotFile string, opts ...AddNodeOption) error { f, err := os.Open(snapshotFile) if err != nil { return err } - defer func() { - err := f.Close() - if err != nil { - log.Error("Error closing snapshot file", "err", err) - } - }() + defer f.Close() + jsonbyte, err := ioutil.ReadAll(f) if err != nil { return err } var snap simulations.Snapshot - err = json.Unmarshal(jsonbyte, &snap) - if err != nil { + if err := json.Unmarshal(jsonbyte, &snap); err != nil { return err } //the snapshot probably has the property EnableMsgEvents not set - //just in case, set it to true! - //(we need this to wait for messages before uploading) + //set it to true (we need this to wait for messages before uploading) for i := range snap.Nodes { snap.Nodes[i].Node.Config.EnableMsgEvents = true snap.Nodes[i].Node.Config.Services = s.serviceNames @@ -249,15 +243,10 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) } } - log.Info("Waiting for p2p connections to be established...") - - //now we can load the snapshot - err = s.Net.Load(&snap) - if err != nil { + if err := s.Net.Load(&snap); err != nil { return err } - log.Info("Snapshot loaded") - return nil + return s.WaitTillSnapshotRecreated(ctx, &snap) } // StartNode starts a node by NodeID. diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go index bae5afb26..e1e20a0f1 100644 --- a/swarm/network/simulation/node_test.go +++ b/swarm/network/simulation/node_test.go @@ -289,6 +289,7 @@ func TestUploadSnapshot(t *testing.T) { HiveParams: hp, } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + b.Store(BucketKeyKademlia, kad) return network.NewBzz(config, kad, nil, nil, nil), nil, nil }, }) @@ -296,12 +297,13 @@ func TestUploadSnapshot(t *testing.T) { nodeCount := 16 log.Debug("Uploading snapshot") - err := s.UploadSnapshot(fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + err := s.UploadSnapshot(ctx, fmt.Sprintf("../stream/testing/snapshot_%d.json", nodeCount)) if err != nil { t.Fatalf("Error uploading snapshot to simulation network: %v", err) } - ctx := context.Background() log.Debug("Starting simulation...") s.Run(ctx, func(ctx context.Context, sim *Simulation) error { log.Debug("Checking") diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 2fdf8e9e3..2957999f8 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -22,8 +22,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/testutil" - "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" @@ -31,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/testutil" ) //constants for random file generation @@ -155,14 +154,15 @@ func runFileRetrievalTest(nodeCount int) error { //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancelSimRun() + + filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) + err := sim.UploadSnapshot(ctx, filename) if err != nil { return err } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) - defer cancelSimRun() - log.Info("Starting simulation") result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { @@ -188,9 +188,6 @@ func runFileRetrievalTest(nodeCount int) error { if err != nil { return err } - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } log.Info("network healthy, start file checks") @@ -253,12 +250,15 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) + err := sim.UploadSnapshot(ctx, filename) if err != nil { return err } - ctx := context.Background() result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { nodeIDs := sim.UpNodeIDs() for _, n := range nodeIDs { @@ -283,9 +283,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { if err != nil { return err } - if _, err := sim.WaitTillHealthy(ctx); err != nil { - return err - } // File retrieval check is repeated until all uploaded files are retrieved from all nodes // or until the timeout is reached. diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 9737ec0a5..ce1e69db2 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -147,20 +147,16 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancelSimRun() + + filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) + err := sim.UploadSnapshot(ctx, filename) if err != nil { t.Fatal(err) } - ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancelSimRun() - - if _, err := sim.WaitTillHealthy(ctx); err != nil { - t.Fatal(err) - } - result := runSim(conf, ctx, sim, chunkCount) - if result.Error != nil { t.Fatal(result.Error) } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 83719af8a..bdd3087bb 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -1257,9 +1257,10 @@ func TestGetSubscriptionsRPC(t *testing.T) { simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(subscribeMsgCode), ) - // upload a snapshot - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) - if err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) + if err := sim.UploadSnapshot(ctx, filename); err != nil { t.Fatal(err) } diff --git a/swarm/pss/prox_test.go b/swarm/pss/prox_test.go index 1c8538d50..0b60ec39a 100644 --- a/swarm/pss/prox_test.go +++ b/swarm/pss/prox_test.go @@ -3,11 +3,8 @@ package pss import ( "context" "encoding/binary" - "encoding/json" "errors" "fmt" - "io/ioutil" - "os" "strconv" "strings" "sync" @@ -20,7 +17,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" @@ -105,24 +101,6 @@ func getCmdParams(t *testing.T) (int, int) { return int(msgCount), int(nodeCount) } -func readSnapshot(t *testing.T, nodeCount int) simulations.Snapshot { - f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)) - if err != nil { - t.Fatal(err) - } - defer f.Close() - jsonbyte, err := ioutil.ReadAll(f) - if err != nil { - t.Fatal(err) - } - var snap simulations.Snapshot - err = json.Unmarshal(jsonbyte, &snap) - if err != nil { - t.Fatal(err) - } - return snap -} - func newTestData() *testData { return &testData{ kademlias: make(map[enode.ID]*network.Kademlia), @@ -235,17 +213,13 @@ func testProxNetwork(t *testing.T) { services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias) tstdata.sim = simulation.New(services) defer tstdata.sim.Close() - err := tstdata.sim.UploadSnapshot(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)) + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + filename := fmt.Sprintf("testdata/snapshot_%d.json", nodeCount) + err := tstdata.sim.UploadSnapshot(ctx, filename) if err != nil { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) - defer cancel() - snap := readSnapshot(t, nodeCount) - err = tstdata.sim.WaitTillSnapshotRecreated(ctx, snap) - if err != nil { - t.Fatalf("failed to recreate snapshot: %s", err) - } tstdata.init(msgCount) // initialize the test data wrapper := func(c context.Context, _ *simulation.Simulation) error { return testRoutine(tstdata, c) @@ -426,7 +400,6 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T if err != nil { return nil, nil, err } - b.Store(simulation.BucketKeyKademlia, pskad) // register the handlers we've been passed var deregisters []func() @@ -448,6 +421,8 @@ func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[T Public: false, }) + b.Store(simulation.BucketKeyKademlia, pskad) + // return Pss and cleanups return ps, func() { // run the handler deregister functions in reverse order