diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index e34f87951..50617b5cf 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -16,8 +16,10 @@ package stream import ( + "bytes" "context" "fmt" + "io" "sync" "testing" "time" @@ -33,17 +35,17 @@ import ( "github.com/ethereum/go-ethereum/swarm/testutil" ) -//constants for random file generation +// constants for random file generation const ( minFileSize = 2 maxFileSize = 40 ) -//This test is a retrieval test for nodes. -//A configurable number of nodes can be -//provided to the test. -//Files are uploaded to nodes, other nodes try to retrieve the file -//Number of nodes can be provided via commandline too. +// TestFileRetrieval is a retrieval test for nodes. +// A configurable number of nodes can be +// provided to the test. +// Files are uploaded to nodes, other nodes try to retrieve the file +// Number of nodes can be provided via commandline too. func TestFileRetrieval(t *testing.T) { var nodeCount []int @@ -61,26 +63,57 @@ func TestFileRetrieval(t *testing.T) { } for _, nc := range nodeCount { - if err := runFileRetrievalTest(nc); err != nil { - t.Error(err) + runFileRetrievalTest(t, nc) + } +} + +// TestPureRetrieval tests pure retrieval without syncing +// A configurable number of nodes and chunks +// can be provided to the test. +// A number of random chunks is generated, then stored directly in +// each node's localstore according to their address. +// Each chunk is supposed to end up at certain nodes +// With retrieval we then make sure that every node can actually retrieve +// the chunks. +func TestPureRetrieval(t *testing.T) { + var nodeCount []int + var chunkCount []int + + if *nodes != 0 && *chunks != 0 { + nodeCount = []int{*nodes} + chunkCount = []int{*chunks} + } else { + nodeCount = []int{16} + chunkCount = []int{150} + + if *longrunning { + nodeCount = append(nodeCount, 32, 64) + chunkCount = append(chunkCount, 32, 256) + } else if testutil.RaceEnabled { + nodeCount = []int{4} + chunkCount = []int{4} + } + + } + + for _, nc := range nodeCount { + for _, c := range chunkCount { + runPureRetrievalTest(t, nc, c) } } } -//This test is a retrieval test for nodes. -//One node is randomly selected to be the pivot node. -//A configurable number of chunks and nodes can be -//provided to the test, the number of chunks is uploaded -//to the pivot node and other nodes try to retrieve the chunk(s). -//Number of chunks and nodes can be provided via commandline too. +// TestRetrieval tests retrieval of chunks by random nodes. +// One node is randomly selected to be the pivot node. +// A configurable number of chunks and nodes can be +// provided to the test, the number of chunks is uploaded +// to the pivot node and other nodes try to retrieve the chunk(s). +// Number of chunks and nodes can be provided via commandline too. func TestRetrieval(t *testing.T) { - //if nodes/chunks have been provided via commandline, - //run the tests with these values + // if nodes/chunks have been provided via commandline, + // run the tests with these values if *nodes != 0 && *chunks != 0 { - err := runRetrievalTest(t, *chunks, *nodes) - if err != nil { - t.Fatal(err) - } + runRetrievalTest(t, *chunks, *nodes) } else { nodeCnt := []int{16} chnkCnt := []int{32} @@ -96,10 +129,7 @@ func TestRetrieval(t *testing.T) { for _, n := range nodeCnt { for _, c := range chnkCnt { t.Run(fmt.Sprintf("TestRetrieval_%d_%d", n, c), func(t *testing.T) { - err := runRetrievalTest(t, c, n) - if err != nil { - t.Fatal(err) - } + runRetrievalTest(t, c, n) }) } } @@ -132,15 +162,160 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ }, } -/* -The test loads a snapshot file to construct the swarm network, -assuming that the snapshot file identifies a healthy -kademlia network. Nevertheless a health check runs in the -simulation's `action` function. +// runPureRetrievalTest by uploading a snapshot, +// then starting a simulation, distribute chunks to nodes +// and start retrieval. +// The snapshot should have 'streamer' in its service list. +func runPureRetrievalTest(t *testing.T, nodeCount int, chunkCount int) { + + t.Helper() + // the pure retrieval test needs a different service map, as we want + // syncing disabled and we don't need to set the syncUpdateDelay + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Syncing: SyncingDisabled, + }, nil) + + cleanup = func() { + r.Close() + clean() + } + + return r, cleanup, nil + }, + }, + ) + defer sim.Close() + + log.Info("Initializing test config", "node count", nodeCount) + + conf := &synctestConfig{} + //map of discover ID to indexes of chunks expected at that ID + conf.idToChunksMap = make(map[enode.ID][]int) + //map of overlay address to discover ID + conf.addrToIDMap = make(map[string]enode.ID) + //array where the generated chunk hashes will be stored + conf.hashes = make([]storage.Address, 0) + + ctx, cancelSimRun := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancelSimRun() + + filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) + err := sim.UploadSnapshot(ctx, filename) + if err != nil { + t.Fatal(err) + } + + log.Info("Starting simulation") + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + // first iteration: create addresses + for _, n := range nodeIDs { + //get the kademlia overlay address from this ID + a := n.Bytes() + //append it to the array of all overlay addresses + conf.addrs = append(conf.addrs, a) + //the proximity calculation is on overlay addr, + //the p2p/simulations check func triggers on enode.ID, + //so we need to know which overlay addr maps to which nodeID + conf.addrToIDMap[string(a)] = n + } + + // now create random chunks + chunks := storage.GenerateRandomChunks(int64(chunkSize), chunkCount) + for _, chunk := range chunks { + conf.hashes = append(conf.hashes, chunk.Address()) + } + + log.Debug("random chunks generated, mapping keys to nodes") + + // map addresses to nodes + mapKeysToNodes(conf) + + // second iteration: storing chunks at the peer whose + // overlay address is closest to a particular chunk's hash + log.Debug("storing every chunk at correspondent node store") + for _, id := range nodeIDs { + // for every chunk for this node (which are only indexes)... + for _, ch := range conf.idToChunksMap[id] { + item, ok := sim.NodeItem(id, bucketKeyStore) + if !ok { + return fmt.Errorf("Error accessing localstore") + } + lstore := item.(chunk.Store) + // ...get the actual chunk + for _, chnk := range chunks { + if bytes.Equal(chnk.Address(), conf.hashes[ch]) { + // ...and store it in the localstore + if _, err = lstore.Put(ctx, chunk.ModePutUpload, chnk); err != nil { + return err + } + } + } + } + } + + // now try to retrieve every chunk from every node + log.Debug("starting retrieval") + cnt := 0 + + for _, id := range nodeIDs { + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + for _, chunk := range chunks { + reader, _ := fileStore.Retrieve(context.TODO(), chunk.Address()) + content := make([]byte, chunkSize) + size, err := reader.Read(content) + //check chunk size and content + ok := true + if err != io.EOF { + log.Debug("Retrieve error", "err", err, "hash", chunk.Address(), "nodeId", id) + ok = false + } + if size != chunkSize { + log.Debug("size not equal chunkSize", "size", size, "hash", chunk.Address(), "nodeId", id) + ok = false + } + // skip chunk "metadata" for chunk.Data() + if !bytes.Equal(content, chunk.Data()[8:]) { + log.Debug("content not equal chunk data", "hash", chunk.Address(), "nodeId", id) + ok = false + } + if !ok { + return fmt.Errorf("Expected test to succeed at first run, but failed with chunk not found") + } + log.Debug(fmt.Sprintf("chunk with root hash %x successfully retrieved", chunk.Address())) + cnt++ + } + } + log.Info("retrieval terminated, chunks retrieved: ", "count", cnt) + return nil + + }) + + log.Info("Simulation terminated") + + if result.Error != nil { + t.Fatal(result.Error) + } +} + +// runFileRetrievalTest loads a snapshot file to construct the swarm network. +// The snapshot should have 'streamer' in its service list. +func runFileRetrievalTest(t *testing.T, nodeCount int) { + + t.Helper() -The snapshot should have 'streamer' in its service list. -*/ -func runFileRetrievalTest(nodeCount int) error { sim := simulation.New(retrievalSimServiceMap) defer sim.Close() @@ -160,7 +335,7 @@ func runFileRetrievalTest(nodeCount int) error { filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) err := sim.UploadSnapshot(ctx, filename) if err != nil { - return err + t.Fatal(err) } log.Info("Starting simulation") @@ -180,9 +355,6 @@ func runFileRetrievalTest(nodeCount int) error { //an array for the random files var randomFiles []string - //channel to signal when the upload has finished - //uploadFinished := make(chan struct{}) - //channel to trigger new node checks conf.hashes, randomFiles, err = uploadFilesToNodes(sim) if err != nil { @@ -221,24 +393,17 @@ func runFileRetrievalTest(nodeCount int) error { log.Info("Simulation terminated") if result.Error != nil { - return result.Error + t.Fatal(result.Error) } - - return nil } -/* -The test generates the given number of chunks. +// runRetrievalTest generates the given number of chunks. +// The test loads a snapshot file to construct the swarm network. +// The snapshot should have 'streamer' in its service list. +func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) { -The test loads a snapshot file to construct the swarm network, -assuming that the snapshot file identifies a healthy -kademlia network. Nevertheless a health check runs in the -simulation's `action` function. - -The snapshot should have 'streamer' in its service list. -*/ -func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { t.Helper() + sim := simulation.New(retrievalSimServiceMap) defer sim.Close() @@ -256,7 +421,7 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { filename := fmt.Sprintf("testing/snapshot_%d.json", nodeCount) err := sim.UploadSnapshot(ctx, filename) if err != nil { - return err + t.Fatal(err) } result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { @@ -278,8 +443,8 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { if !ok { return fmt.Errorf("No localstore") } - store := item.(chunk.Store) - conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, store) + lstore := item.(chunk.Store) + conf.hashes, err = uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore) if err != nil { return err } @@ -314,8 +479,6 @@ func runRetrievalTest(t *testing.T, chunkCount int, nodeCount int) error { }) if result.Error != nil { - return result.Error + t.Fatal(result.Error) } - - return nil }