diff --git a/swarm/network/stream/visualized_snapshot_sync_sim_test.go b/swarm/network/stream/visualized_snapshot_sync_sim_test.go deleted file mode 100644 index 5a54ad0a8..000000000 --- a/swarm/network/stream/visualized_snapshot_sync_sim_test.go +++ /dev/null @@ -1,342 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// +build withserver - -package stream - -import ( - "bytes" - "context" - "errors" - "fmt" - "io" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/protocols" - "github.com/ethereum/go-ethereum/p2p/simulations" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network/simulation" - "github.com/ethereum/go-ethereum/swarm/state" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -/* -The tests in this file need to be executed with - - -tags=withserver - -Also, they will stall if executed stand-alone, because they wait -for the visualization frontend to send a POST /runsim message. -*/ - -//setup the sim, evaluate nodeCount and chunkCount and create the sim -func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulation.Simulation) { - nodeCount := *nodes - chunkCount := *chunks - - if nodeCount == 0 || chunkCount == 0 { - nodeCount = 32 - chunkCount = 1 - } - - //setup the simulation with server, which means the sim won't run - //until it receives a POST /runsim from the frontend - sim := simulation.New(serviceMap).WithServer(":8888") - return nodeCount, chunkCount, sim -} - -//This test requests bogus hashes into the network -func TestNonExistingHashesWithServer(t *testing.T) { - - nodeCount, _, sim := setupSim(retrievalSimServiceMap) - defer sim.Close() - - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) - if err != nil { - panic(err) - } - - //in order to get some meaningful visualization, it is beneficial - //to define a minimum duration of this test - testDuration := 20 * time.Second - - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - disconnected := watchDisconnections(ctx, sim) - defer func() { - if err != nil { - if yes, ok := disconnected.Load().(bool); ok && yes { - err = errors.New("disconnect events received") - } - } - }() - - //check on the node's FileStore (netstore) - id := sim.Net.GetRandomUpNode().ID() - item, ok := sim.NodeItem(id, bucketKeyFileStore) - if !ok { - return errors.New("No filestore") - } - fileStore := item.(*storage.FileStore) - //create a bogus hash - fakeHash := storage.GenerateRandomChunk(1000).Address() - //try to retrieve it - will propagate RetrieveRequestMsg into the network - reader, _ := fileStore.Retrieve(context.TODO(), fakeHash) - if _, err := reader.Size(ctx, nil); err != nil { - log.Debug("expected error for non-existing chunk") - } - //sleep so that the frontend can have something to display - time.Sleep(testDuration) - - return nil - }) - if result.Error != nil { - sendSimTerminatedEvent(sim) - t.Fatal(result.Error) - } - - sendSimTerminatedEvent(sim) - -} - -//send a termination event to the frontend -func sendSimTerminatedEvent(sim *simulation.Simulation) { - evt := &simulations.Event{ - Type: EventTypeSimTerminated, - Control: false, - } - sim.Net.Events().Send(evt) -} - -//This test is the same as the snapshot sync test, -//but with a HTTP server -//It also sends some custom events so that the frontend -//can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg -func TestSnapshotSyncWithServer(t *testing.T) { - //define a wrapper object to be able to pass around data - wrapper := &netWrapper{} - - sim := simulation.New(map[string]simulation.ServiceFunc{ - "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - addr, netStore, delivery, clean, err := newNetStoreAndDeliveryWithRequestFunc(ctx, bucket, dummyRequestFromPeers) - if err != nil { - return nil, nil, err - } - - r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, - SyncUpdateDelay: 3 * time.Second, - }, nil) - - tr := &testRegistry{ - Registry: r, - w: wrapper, - } - - bucket.Store(bucketKeyRegistry, tr) - - cleanup = func() { - tr.Close() - clean() - } - - return tr, cleanup, nil - }, - }).WithServer(":8888") //start with the HTTP server - - nodeCount, chunkCount, sim := setupSim(simServiceMap) - defer sim.Close() - - log.Info(fmt.Sprintf("Running the simulation with %d nodes and %d chunks", nodeCount, chunkCount)) - log.Info("Initializing test config") - - conf := &synctestConfig{} - //map of discover ID to indexes of chunks expected at that ID - conf.idToChunksMap = make(map[enode.ID][]int) - //map of overlay address to discover ID - conf.addrToIDMap = make(map[string]enode.ID) - //array where the generated chunk hashes will be stored - conf.hashes = make([]storage.Address, 0) - //pass the network to the wrapper object - wrapper.setNetwork(sim.Net) - err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) - if err != nil { - panic(err) - } - - //run the sim - result := runSim(conf, ctx, sim, chunkCount) - - //send terminated event - evt := &simulations.Event{ - Type: EventTypeSimTerminated, - Control: false, - } - go sim.Net.Events().Send(evt) - - if result.Error != nil { - panic(result.Error) - } - log.Info("Simulation ended") -} - -//testRegistry embeds registry -//it allows to replace the protocol run function -type testRegistry struct { - *Registry - w *netWrapper -} - -//Protocols replaces the protocol's run function -func (tr *testRegistry) Protocols() []p2p.Protocol { - regProto := tr.Registry.Protocols() - //set the `stream` protocol's run function with the testRegistry's one - regProto[0].Run = tr.runProto - return regProto -} - -//runProto is the new overwritten protocol's run function for this test -func (tr *testRegistry) runProto(p *p2p.Peer, rw p2p.MsgReadWriter) error { - //create a custom rw message ReadWriter - testRw := &testMsgReadWriter{ - MsgReadWriter: rw, - Peer: p, - w: tr.w, - Registry: tr.Registry, - } - //now run the actual upper layer `Registry`'s protocol function - return tr.runProtocol(p, testRw) -} - -//testMsgReadWriter is a custom rw -//it will allow us to re-use the message twice -type testMsgReadWriter struct { - *Registry - p2p.MsgReadWriter - *p2p.Peer - w *netWrapper -} - -//netWrapper wrapper object so we can pass data around -type netWrapper struct { - net *simulations.Network -} - -//set the network to the wrapper for later use (used inside the custom rw) -func (w *netWrapper) setNetwork(n *simulations.Network) { - w.net = n -} - -//get he network from the wrapper (used inside the custom rw) -func (w *netWrapper) getNetwork() *simulations.Network { - return w.net -} - -// ReadMsg reads a message from the underlying MsgReadWriter and emits a -// "message received" event -//we do this because we are interested in the Payload of the message for custom use -//in this test, but messages can only be consumed once (stream io.Reader) -func (ev *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { - //read the message from the underlying rw - msg, err := ev.MsgReadWriter.ReadMsg() - if err != nil { - return msg, err - } - - //don't do anything with message codes we actually are not needing/reading - subCodes := []uint64{1, 2, 10} - found := false - for _, c := range subCodes { - if c == msg.Code { - found = true - } - } - //just return if not a msg code we are interested in - if !found { - return msg, nil - } - - //we use a io.TeeReader so that we can read the message twice - //the Payload is a io.Reader, so if we read from it, the actual protocol handler - //cannot access it anymore. - //But we need that handler to be able to consume the message as normal, - //as if we would not do anything here with that message - var buf bytes.Buffer - tee := io.TeeReader(msg.Payload, &buf) - - mcp := &p2p.Msg{ - Code: msg.Code, - Size: msg.Size, - ReceivedAt: msg.ReceivedAt, - Payload: tee, - } - //assign the copy for later use - msg.Payload = &buf - - //now let's look into the message - var wmsg protocols.WrappedMsg - err = mcp.Decode(&wmsg) - if err != nil { - log.Error(err.Error()) - return msg, err - } - //create a new message from the code - val, ok := ev.Registry.GetSpec().NewMsg(mcp.Code) - if !ok { - return msg, errors.New(fmt.Sprintf("Invalid message code: %v", msg.Code)) - } - //decode it - if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil { - return msg, errors.New(fmt.Sprintf("Decoding error <= %v: %v", msg, err)) - } - //now for every message type we are interested in, create a custom event and send it - var evt *simulations.Event - switch val := val.(type) { - case *OfferedHashesMsg: - evt = &simulations.Event{ - Type: EventTypeChunkOffered, - Node: ev.w.getNetwork().GetNode(ev.ID()), - Control: false, - Data: val.Hashes, - } - case *WantedHashesMsg: - evt = &simulations.Event{ - Type: EventTypeChunkWanted, - Node: ev.w.getNetwork().GetNode(ev.ID()), - Control: false, - } - case *ChunkDeliveryMsgSyncing: - evt = &simulations.Event{ - Type: EventTypeChunkDelivered, - Node: ev.w.getNetwork().GetNode(ev.ID()), - Control: false, - Data: val.Addr.String(), - } - } - if evt != nil { - //send custom event to feed; frontend will listen to it and display - ev.w.getNetwork().Events().Send(evt) - } - return msg, nil -}