Update visualized snapshot test (#18286)
* swarm/network/stream: fix visualized_snapshot_sync_sim_test * swarm/network/stream: updated visualized snapshot-test;data in p2p event * swarm/network/stream: cleanup visualized snapshot sync test * swarm/network/stream: re-enable t.Skip for visualized test * swarm/network/stream: addressed PR comments
This commit is contained in:
		
							parent
							
								
									472c23a801
								
							
						
					
					
						commit
						90ea542e9e
					
				| @ -19,16 +19,27 @@ | ||||
| package stream | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/node" | ||||
| 	"github.com/ethereum/go-ethereum/p2p" | ||||
| 	"github.com/ethereum/go-ethereum/p2p/discover" | ||||
| 	"github.com/ethereum/go-ethereum/p2p/enode" | ||||
| 	"github.com/ethereum/go-ethereum/p2p/protocols" | ||||
| 	"github.com/ethereum/go-ethereum/p2p/simulations" | ||||
| 	"github.com/ethereum/go-ethereum/p2p/simulations/adapters" | ||||
| 	"github.com/ethereum/go-ethereum/rlp" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/log" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/network" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/network/simulation" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/state" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/storage" | ||||
| ) | ||||
| 
 | ||||
| @ -68,12 +79,12 @@ func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) | ||||
| 	disconnections := sim.PeerEvents( | ||||
| 		context.Background(), | ||||
| 		sim.NodeIDs(), | ||||
| 		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), | ||||
| 		simulation.NewPeerEventsFilter().Drop(), | ||||
| 	) | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for d := range disconnections { | ||||
| 			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) | ||||
| 			log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) | ||||
| 			panic("unexpected disconnect") | ||||
| 			cancelSimRun() | ||||
| 		} | ||||
| @ -144,21 +155,75 @@ func sendSimTerminatedEvent(sim *simulation.Simulation) { | ||||
| //It also sends some custom events so that the frontend
 | ||||
| //can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg
 | ||||
| func TestSnapshotSyncWithServer(t *testing.T) { | ||||
| 	//t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted")
 | ||||
| 
 | ||||
| 	//define a wrapper object to be able to pass around data
 | ||||
| 	wrapper := &netWrapper{} | ||||
| 
 | ||||
| 	nodeCount := *nodes | ||||
| 	chunkCount := *chunks | ||||
| 
 | ||||
| 	if nodeCount == 0 || chunkCount == 0 { | ||||
| 		nodeCount = 32 | ||||
| 		chunkCount = 1 | ||||
| 	} | ||||
| 
 | ||||
| 	log.Info(fmt.Sprintf("Running the simulation with %d nodes and %d chunks", nodeCount, chunkCount)) | ||||
| 
 | ||||
| 	sim := simulation.New(map[string]simulation.ServiceFunc{ | ||||
| 		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { | ||||
| 			n := ctx.Config.Node() | ||||
| 			addr := network.NewAddr(n) | ||||
| 			store, datadir, err := createTestLocalStorageForID(n.ID(), addr) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			bucket.Store(bucketKeyStore, store) | ||||
| 			localStore := store.(*storage.LocalStore) | ||||
| 			netStore, err := storage.NewNetStore(localStore, nil) | ||||
| 			if err != nil { | ||||
| 				return nil, nil, err | ||||
| 			} | ||||
| 			kad := network.NewKademlia(addr.Over(), network.NewKadParams()) | ||||
| 			delivery := NewDelivery(kad, netStore) | ||||
| 			netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New | ||||
| 
 | ||||
| 			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ | ||||
| 				Retrieval:       RetrievalDisabled, | ||||
| 				Syncing:         SyncingAutoSubscribe, | ||||
| 				SyncUpdateDelay: 3 * time.Second, | ||||
| 			}, nil) | ||||
| 
 | ||||
| 			tr := &testRegistry{ | ||||
| 				Registry: r, | ||||
| 				w:        wrapper, | ||||
| 			} | ||||
| 
 | ||||
| 			bucket.Store(bucketKeyRegistry, tr) | ||||
| 
 | ||||
| 			cleanup = func() { | ||||
| 				netStore.Close() | ||||
| 				tr.Close() | ||||
| 				os.RemoveAll(datadir) | ||||
| 			} | ||||
| 
 | ||||
| 			return tr, cleanup, nil | ||||
| 		}, | ||||
| 	}).WithServer(":8888") //start with the HTTP server
 | ||||
| 
 | ||||
| 	t.Skip("temporarily disabled as simulations.WaitTillHealthy cannot be trusted") | ||||
| 	nodeCount, chunkCount, sim := setupSim(simServiceMap) | ||||
| 	defer sim.Close() | ||||
| 
 | ||||
| 	log.Info("Initializing test config") | ||||
| 
 | ||||
| 	conf := &synctestConfig{} | ||||
| 	//map of discover ID to indexes of chunks expected at that ID
 | ||||
| 	conf.idToChunksMap = make(map[discover.NodeID][]int) | ||||
| 	conf.idToChunksMap = make(map[enode.ID][]int) | ||||
| 	//map of overlay address to discover ID
 | ||||
| 	conf.addrToIDMap = make(map[string]discover.NodeID) | ||||
| 	conf.addrToIDMap = make(map[string]enode.ID) | ||||
| 	//array where the generated chunk hashes will be stored
 | ||||
| 	conf.hashes = make([]storage.Address, 0) | ||||
| 
 | ||||
| 	//pass the network to the wrapper object
 | ||||
| 	wrapper.setNetwork(sim.Net) | ||||
| 	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| @ -167,49 +232,6 @@ func TestSnapshotSyncWithServer(t *testing.T) { | ||||
| 	ctx, cancelSimRun := watchSim(sim) | ||||
| 	defer cancelSimRun() | ||||
| 
 | ||||
| 	//setup filters in the event feed
 | ||||
| 	offeredHashesFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(1) | ||||
| 	wantedFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(2) | ||||
| 	deliveryFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(6) | ||||
| 	eventC := sim.PeerEvents(ctx, sim.UpNodeIDs(), offeredHashesFilter, wantedFilter, deliveryFilter) | ||||
| 
 | ||||
| 	quit := make(chan struct{}) | ||||
| 
 | ||||
| 	go func() { | ||||
| 		for e := range eventC { | ||||
| 			select { | ||||
| 			case <-quit: | ||||
| 				fmt.Println("quitting event loop") | ||||
| 				return | ||||
| 			default: | ||||
| 			} | ||||
| 			if e.Error != nil { | ||||
| 				t.Fatal(e.Error) | ||||
| 			} | ||||
| 			if *e.Event.MsgCode == uint64(1) { | ||||
| 				evt := &simulations.Event{ | ||||
| 					Type:    EventTypeChunkOffered, | ||||
| 					Node:    sim.Net.GetNode(e.NodeID), | ||||
| 					Control: false, | ||||
| 				} | ||||
| 				sim.Net.Events().Send(evt) | ||||
| 			} else if *e.Event.MsgCode == uint64(2) { | ||||
| 				evt := &simulations.Event{ | ||||
| 					Type:    EventTypeChunkWanted, | ||||
| 					Node:    sim.Net.GetNode(e.NodeID), | ||||
| 					Control: false, | ||||
| 				} | ||||
| 				sim.Net.Events().Send(evt) | ||||
| 			} else if *e.Event.MsgCode == uint64(6) { | ||||
| 				evt := &simulations.Event{ | ||||
| 					Type:    EventTypeChunkDelivered, | ||||
| 					Node:    sim.Net.GetNode(e.NodeID), | ||||
| 					Control: false, | ||||
| 				} | ||||
| 				sim.Net.Events().Send(evt) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	//run the sim
 | ||||
| 	result := runSim(conf, ctx, sim, chunkCount) | ||||
| 
 | ||||
| @ -218,11 +240,150 @@ func TestSnapshotSyncWithServer(t *testing.T) { | ||||
| 		Type:    EventTypeSimTerminated, | ||||
| 		Control: false, | ||||
| 	} | ||||
| 	sim.Net.Events().Send(evt) | ||||
| 	go sim.Net.Events().Send(evt) | ||||
| 
 | ||||
| 	if result.Error != nil { | ||||
| 		panic(result.Error) | ||||
| 	} | ||||
| 	close(quit) | ||||
| 	log.Info("Simulation ended") | ||||
| } | ||||
| 
 | ||||
| //testRegistry embeds registry
 | ||||
| //it allows to replace the protocol run function
 | ||||
| type testRegistry struct { | ||||
| 	*Registry | ||||
| 	w *netWrapper | ||||
| } | ||||
| 
 | ||||
| //Protocols replaces the protocol's run function
 | ||||
| func (tr *testRegistry) Protocols() []p2p.Protocol { | ||||
| 	regProto := tr.Registry.Protocols() | ||||
| 	//set the `stream` protocol's run function with the testRegistry's one
 | ||||
| 	regProto[0].Run = tr.runProto | ||||
| 	return regProto | ||||
| } | ||||
| 
 | ||||
| //runProto is the new overwritten protocol's run function for this test
 | ||||
| func (tr *testRegistry) runProto(p *p2p.Peer, rw p2p.MsgReadWriter) error { | ||||
| 	//create a custom rw message ReadWriter
 | ||||
| 	testRw := &testMsgReadWriter{ | ||||
| 		MsgReadWriter: rw, | ||||
| 		Peer:          p, | ||||
| 		w:             tr.w, | ||||
| 		Registry:      tr.Registry, | ||||
| 	} | ||||
| 	//now run the actual upper layer `Registry`'s protocol function
 | ||||
| 	return tr.runProtocol(p, testRw) | ||||
| } | ||||
| 
 | ||||
| //testMsgReadWriter is a custom rw
 | ||||
| //it will allow us to re-use the message twice
 | ||||
| type testMsgReadWriter struct { | ||||
| 	*Registry | ||||
| 	p2p.MsgReadWriter | ||||
| 	*p2p.Peer | ||||
| 	w *netWrapper | ||||
| } | ||||
| 
 | ||||
| //netWrapper wrapper object so we can pass data around
 | ||||
| type netWrapper struct { | ||||
| 	net *simulations.Network | ||||
| } | ||||
| 
 | ||||
| //set the network to the wrapper for later use (used inside the custom rw)
 | ||||
| func (w *netWrapper) setNetwork(n *simulations.Network) { | ||||
| 	w.net = n | ||||
| } | ||||
| 
 | ||||
| //get he network from the wrapper (used inside the custom rw)
 | ||||
| func (w *netWrapper) getNetwork() *simulations.Network { | ||||
| 	return w.net | ||||
| } | ||||
| 
 | ||||
| // ReadMsg reads a message from the underlying MsgReadWriter and emits a
 | ||||
| // "message received" event
 | ||||
| //we do this because we are interested in the Payload of the message for custom use
 | ||||
| //in this test, but messages can only be consumed once (stream io.Reader)
 | ||||
| func (ev *testMsgReadWriter) ReadMsg() (p2p.Msg, error) { | ||||
| 	//read the message from the underlying rw
 | ||||
| 	msg, err := ev.MsgReadWriter.ReadMsg() | ||||
| 	if err != nil { | ||||
| 		return msg, err | ||||
| 	} | ||||
| 
 | ||||
| 	//don't do anything with message codes we actually are not needing/reading
 | ||||
| 	subCodes := []uint64{1, 2, 10} | ||||
| 	found := false | ||||
| 	for _, c := range subCodes { | ||||
| 		if c == msg.Code { | ||||
| 			found = true | ||||
| 		} | ||||
| 	} | ||||
| 	//just return if not a msg code we are interested in
 | ||||
| 	if !found { | ||||
| 		return msg, nil | ||||
| 	} | ||||
| 
 | ||||
| 	//we use a io.TeeReader so that we can read the message twice
 | ||||
| 	//the Payload is a io.Reader, so if we read from it, the actual protocol handler
 | ||||
| 	//cannot access it anymore.
 | ||||
| 	//But we need that handler to be able to consume the message as normal,
 | ||||
| 	//as if we would not do anything here with that message
 | ||||
| 	var buf bytes.Buffer | ||||
| 	tee := io.TeeReader(msg.Payload, &buf) | ||||
| 
 | ||||
| 	mcp := &p2p.Msg{ | ||||
| 		Code:       msg.Code, | ||||
| 		Size:       msg.Size, | ||||
| 		ReceivedAt: msg.ReceivedAt, | ||||
| 		Payload:    tee, | ||||
| 	} | ||||
| 	//assign the copy for later use
 | ||||
| 	msg.Payload = &buf | ||||
| 
 | ||||
| 	//now let's look into the message
 | ||||
| 	var wmsg protocols.WrappedMsg | ||||
| 	err = mcp.Decode(&wmsg) | ||||
| 	if err != nil { | ||||
| 		log.Error(err.Error()) | ||||
| 		return msg, err | ||||
| 	} | ||||
| 	//create a new message from the code
 | ||||
| 	val, ok := ev.Registry.GetSpec().NewMsg(mcp.Code) | ||||
| 	if !ok { | ||||
| 		return msg, errors.New(fmt.Sprintf("Invalid message code: %v", msg.Code)) | ||||
| 	} | ||||
| 	//decode it
 | ||||
| 	if err := rlp.DecodeBytes(wmsg.Payload, val); err != nil { | ||||
| 		return msg, errors.New(fmt.Sprintf("Decoding error <= %v: %v", msg, err)) | ||||
| 	} | ||||
| 	//now for every message type we are interested in, create a custom event and send it
 | ||||
| 	var evt *simulations.Event | ||||
| 	switch val := val.(type) { | ||||
| 	case *OfferedHashesMsg: | ||||
| 		evt = &simulations.Event{ | ||||
| 			Type:    EventTypeChunkOffered, | ||||
| 			Node:    ev.w.getNetwork().GetNode(ev.ID()), | ||||
| 			Control: false, | ||||
| 			Data:    val.Hashes, | ||||
| 		} | ||||
| 	case *WantedHashesMsg: | ||||
| 		evt = &simulations.Event{ | ||||
| 			Type:    EventTypeChunkWanted, | ||||
| 			Node:    ev.w.getNetwork().GetNode(ev.ID()), | ||||
| 			Control: false, | ||||
| 		} | ||||
| 	case *ChunkDeliveryMsgSyncing: | ||||
| 		evt = &simulations.Event{ | ||||
| 			Type:    EventTypeChunkDelivered, | ||||
| 			Node:    ev.w.getNetwork().GetNode(ev.ID()), | ||||
| 			Control: false, | ||||
| 			Data:    val.Addr.String(), | ||||
| 		} | ||||
| 	} | ||||
| 	if evt != nil { | ||||
| 		//send custom event to feed; frontend will listen to it and display
 | ||||
| 		ev.w.getNetwork().Events().Send(evt) | ||||
| 	} | ||||
| 	return msg, nil | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user