From d6efa691872efb723ea3177a92da9e9b31c34eba Mon Sep 17 00:00:00 2001 From: holisticode Date: Mon, 30 Jul 2018 15:55:25 -0500 Subject: [PATCH] Merge netsim mig to master (#17241) * swarm: merged stream-tests migration to develop * swarm/network: expose simulation RandomUpNode to use in stream tests * swarm/network: wait for subs in PeerEvents and fix stream.runSyncTest * swarm: enforce waitkademlia for snapshot tests * swarm: fixed syncer tests and snapshot_sync_test * swarm: linting of simulation package * swarm: address review comments * swarm/network/stream: fix delivery_test bugs and refactor * swarm/network/stream: addressed PR comments @janos * swarm/network/stream: enforce waitKademlia, improve TestIntervals * swarm/network/stream: TestIntervals not waiting for chunk to be stored --- swarm/network/simulation/bucket.go | 2 +- swarm/network/simulation/connect.go | 4 +- swarm/network/simulation/events.go | 11 + swarm/network/simulation/http.go | 11 +- swarm/network/simulation/http_test.go | 7 +- swarm/network/simulation/node.go | 19 +- swarm/network/simulation/service.go | 2 +- swarm/network/stream/common_test.go | 368 ++------ swarm/network/stream/delivery_test.go | 576 +++++------- swarm/network/stream/intervals_test.go | 552 ++++++----- .../network/stream/snapshot_retrieval_test.go | 887 +++++------------- swarm/network/stream/snapshot_sync_test.go | 809 +++++++--------- swarm/network/stream/syncer_test.go | 349 ++++--- swarm/network/stream/testing/testing.go | 293 ------ 14 files changed, 1411 insertions(+), 2479 deletions(-) delete mode 100644 swarm/network/stream/testing/testing.go diff --git a/swarm/network/simulation/bucket.go b/swarm/network/simulation/bucket.go index b37afaaa4..ddbedb521 100644 --- a/swarm/network/simulation/bucket.go +++ b/swarm/network/simulation/bucket.go @@ -43,7 +43,7 @@ func (s *Simulation) SetNodeItem(id discover.NodeID, key interface{}, value inte s.buckets[id].Store(key, value) } -// NodeItems returns a map of items from all nodes that are all set under the +// NodesItems returns a map of items from all nodes that are all set under the // same BucketKey. func (s *Simulation) NodesItems(key interface{}) (values map[discover.NodeID]interface{}) { s.mu.RLock() diff --git a/swarm/network/simulation/connect.go b/swarm/network/simulation/connect.go index 3fe82052b..3d0f6cb3f 100644 --- a/swarm/network/simulation/connect.go +++ b/swarm/network/simulation/connect.go @@ -54,7 +54,7 @@ func (s *Simulation) ConnectToLastNode(id discover.NodeID) (err error) { // ConnectToRandomNode connects the node with provieded NodeID // to a random node that is up. func (s *Simulation) ConnectToRandomNode(id discover.NodeID) (err error) { - n := s.randomUpNode(id) + n := s.RandomUpNode(id) if n == nil { return ErrNodeNotFound } @@ -135,7 +135,7 @@ func (s *Simulation) ConnectNodesStar(id discover.NodeID, ids []discover.NodeID) return nil } -// ConnectNodesStar connects all nodes in a star topology +// ConnectNodesStarPivot connects all nodes in a star topology // with the center at already set pivot node. // If ids argument is nil, all nodes that are up will be connected. func (s *Simulation) ConnectNodesStarPivot(ids []discover.NodeID) (err error) { diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go index f9cfadb73..980a9a756 100644 --- a/swarm/network/simulation/events.go +++ b/swarm/network/simulation/events.go @@ -18,6 +18,7 @@ package simulation import ( "context" + "sync" "github.com/ethereum/go-ethereum/p2p/discover" @@ -71,24 +72,32 @@ func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter { func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filters ...*PeerEventsFilter) <-chan PeerEvent { eventC := make(chan PeerEvent) + // wait group to make sure all subscriptions to admin peerEvents are established + // before this function returns. + var subsWG sync.WaitGroup for _, id := range ids { s.shutdownWG.Add(1) + subsWG.Add(1) go func(id discover.NodeID) { defer s.shutdownWG.Done() client, err := s.Net.GetNode(id).Client() if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } events := make(chan *p2p.PeerEvent) sub, err := client.Subscribe(ctx, "admin", events, "peerEvents") if err != nil { + subsWG.Done() eventC <- PeerEvent{NodeID: id, Error: err} return } defer sub.Unsubscribe() + subsWG.Done() + for { select { case <-ctx.Done(): @@ -153,5 +162,7 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []discover.NodeID, filt }(id) } + // wait all subscriptions + subsWG.Wait() return eventC } diff --git a/swarm/network/simulation/http.go b/swarm/network/simulation/http.go index 40f13f32d..69ae3baec 100644 --- a/swarm/network/simulation/http.go +++ b/swarm/network/simulation/http.go @@ -29,7 +29,7 @@ var ( DefaultHTTPSimAddr = ":8888" ) -//`With`(builder) pattern constructor for Simulation to +//WithServer implements the builder pattern constructor for Simulation to //start with a HTTP server func (s *Simulation) WithServer(addr string) *Simulation { //assign default addr if nothing provided @@ -46,7 +46,12 @@ func (s *Simulation) WithServer(addr string) *Simulation { Addr: addr, Handler: s.handler, } - go s.httpSrv.ListenAndServe() + go func() { + err := s.httpSrv.ListenAndServe() + if err != nil { + log.Error("Error starting the HTTP server", "error", err) + } + }() return s } @@ -55,7 +60,7 @@ func (s *Simulation) addSimulationRoutes() { s.handler.POST("/runsim", s.RunSimulation) } -// StartNetwork starts all nodes in the network +// RunSimulation is the actual POST endpoint runner func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request) { log.Debug("RunSimulation endpoint running") s.runC <- struct{}{} diff --git a/swarm/network/simulation/http_test.go b/swarm/network/simulation/http_test.go index 4d8bf9946..775cf9219 100644 --- a/swarm/network/simulation/http_test.go +++ b/swarm/network/simulation/http_test.go @@ -96,7 +96,12 @@ func sendRunSignal(t *testing.T) { if err != nil { t.Fatalf("Request failed: %v", err) } - defer resp.Body.Close() + defer func() { + err := resp.Body.Close() + if err != nil { + log.Error("Error closing response body", "err", err) + } + }() log.Debug("Signal sent") if resp.StatusCode != http.StatusOK { t.Fatalf("err %s", resp.Status) diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go index bc433cfd8..784588fa6 100644 --- a/swarm/network/simulation/node.go +++ b/swarm/network/simulation/node.go @@ -195,7 +195,7 @@ func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (i return ids, nil } -//Upload a snapshot +//UploadSnapshot uploads a snapshot to the simulation //This method tries to open the json file provided, applies the config to all nodes //and then loads the snapshot into the Simulation network func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error { @@ -203,7 +203,12 @@ func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) if err != nil { return err } - defer f.Close() + defer func() { + err := f.Close() + if err != nil { + log.Error("Error closing snapshot file", "err", err) + } + }() jsonbyte, err := ioutil.ReadAll(f) if err != nil { return err @@ -294,7 +299,7 @@ func (s *Simulation) StopNode(id discover.NodeID) (err error) { // StopRandomNode stops a random node. func (s *Simulation) StopRandomNode() (id discover.NodeID, err error) { - n := s.randomUpNode() + n := s.RandomUpNode() if n == nil { return id, ErrNodeNotFound } @@ -324,18 +329,18 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// randomUpNode returns a random SimNode that is up. +// RandomUpNode returns a random SimNode that is up. // Arguments are NodeIDs for nodes that should not be returned. -func (s *Simulation) randomUpNode(exclude ...discover.NodeID) *adapters.SimNode { +func (s *Simulation) RandomUpNode(exclude ...discover.NodeID) *adapters.SimNode { return s.randomNode(s.UpNodeIDs(), exclude...) } -// randomUpNode returns a random SimNode that is not up. +// randomDownNode returns a random SimNode that is not up. func (s *Simulation) randomDownNode(exclude ...discover.NodeID) *adapters.SimNode { return s.randomNode(s.DownNodeIDs(), exclude...) } -// randomUpNode returns a random SimNode from the slice of NodeIDs. +// randomNode returns a random SimNode from the slice of NodeIDs. func (s *Simulation) randomNode(ids []discover.NodeID, exclude ...discover.NodeID) *adapters.SimNode { for _, e := range exclude { var i int diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go index d1cbf1f8b..02e7ad0cc 100644 --- a/swarm/network/simulation/service.go +++ b/swarm/network/simulation/service.go @@ -39,7 +39,7 @@ func (s *Simulation) Service(name string, id discover.NodeID) node.Service { // RandomService returns a single Service by name on a // randomly chosen node that is up. func (s *Simulation) RandomService(name string) node.Service { - n := s.randomUpNode() + n := s.RandomUpNode() if n == nil { return nil } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 4d55c6ee3..491dc9fd5 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -18,135 +18,70 @@ package stream import ( "context" - "encoding/binary" + crand "crypto/rand" "errors" "flag" "fmt" "io" "io/ioutil" + "math/rand" "os" + "strings" "sync/atomic" "testing" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/storage/mock" - "github.com/ethereum/go-ethereum/swarm/storage/mock/db" + mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" colorable "github.com/mattn/go-colorable" ) var ( - deliveries map[discover.NodeID]*Delivery - stores map[discover.NodeID]storage.ChunkStore - toAddr func(discover.NodeID) *network.BzzAddr - peerCount func(discover.NodeID) int - adapter = flag.String("adapter", "sim", "type of simulation: sim|exec|docker") loglevel = flag.Int("loglevel", 2, "verbosity of logs") nodes = flag.Int("nodes", 0, "number of nodes") chunks = flag.Int("chunks", 0, "number of chunks") useMockStore = flag.Bool("mockstore", false, "disabled mock store (default: enabled)") -) + longrunning = flag.Bool("longrunning", false, "do run long-running tests") -var ( - defaultSkipCheck bool - waitPeerErrC chan error - chunkSize = 4096 - registries map[discover.NodeID]*TestRegistry - createStoreFunc func(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) - getRetrieveFunc = defaultRetrieveFunc - subscriptionCount = 0 - globalStore mock.GlobalStorer - globalStoreDir string -) + bucketKeyDB = simulation.BucketKey("db") + bucketKeyStore = simulation.BucketKey("store") + bucketKeyFileStore = simulation.BucketKey("filestore") + bucketKeyNetStore = simulation.BucketKey("netstore") + bucketKeyDelivery = simulation.BucketKey("delivery") + bucketKeyRegistry = simulation.BucketKey("registry") -var services = adapters.Services{ - "streamer": NewStreamerService, - "intervalsStreamer": newIntervalsStreamerService, -} + chunkSize = 4096 + pof = pot.DefaultPof(256) +) func init() { flag.Parse() - // register the Delivery service which will run as a devp2p - // protocol when using the exec adapter - adapters.RegisterServices(services) + rand.Seed(time.Now().UnixNano()) log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) } -func createGlobalStore() { - var err error - globalStoreDir, err = ioutil.TempDir("", "global.store") +func createGlobalStore() (string, *mockdb.GlobalStore, error) { + var globalStore *mockdb.GlobalStore + globalStoreDir, err := ioutil.TempDir("", "global.store") if err != nil { log.Error("Error initiating global store temp directory!", "err", err) - return + return "", nil, err } - globalStore, err = db.NewGlobalStore(globalStoreDir) + globalStore, err = mockdb.NewGlobalStore(globalStoreDir) if err != nil { log.Error("Error initiating global store!", "err", err) + return "", nil, err } -} - -// NewStreamerService -func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { - var err error - id := ctx.Config.ID - addr := toAddr(id) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - stores[id], err = createStoreFunc(id, addr) - if err != nil { - return nil, err - } - store := stores[id].(*storage.LocalStore) - db := storage.NewDBAPI(store) - delivery := NewDelivery(kad, db) - deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - DoRetrieve: false, - }) - RegisterSwarmSyncerServer(r, db) - RegisterSwarmSyncerClient(r, db) - go func() { - waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id)) - }() - fileStore := storage.NewFileStore(storage.NewNetStore(store, getRetrieveFunc(id)), storage.NewFileStoreParams()) - testRegistry := &TestRegistry{Registry: r, fileStore: fileStore} - registries[id] = testRegistry - return testRegistry, nil -} - -func defaultRetrieveFunc(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { - return nil -} - -func datadirsCleanup() { - for _, id := range ids { - os.RemoveAll(datadirs[id]) - } - if globalStoreDir != "" { - os.RemoveAll(globalStoreDir) - } -} - -//local stores need to be cleaned up after the sim is done -func localStoreCleanup() { - log.Info("Cleaning up...") - for _, id := range ids { - registries[id].Close() - stores[id].Close() - } - log.Info("Local store cleanup done") + return globalStoreDir, globalStore, nil } func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { @@ -174,9 +109,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora db := storage.NewDBAPI(localStore) delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - }) + streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) teardown := func() { streamer.Close() removeDataDir() @@ -233,22 +166,6 @@ func (rrs *roundRobinStore) Close() { } } -type TestRegistry struct { - *Registry - fileStore *storage.FileStore -} - -func (r *TestRegistry) APIs() []rpc.API { - a := r.Registry.APIs() - a = append(a, rpc.API{ - Namespace: "stream", - Version: "3.0", - Service: r, - Public: true, - }) - return a -} - func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { r, _ := fileStore.Retrieve(context.TODO(), hash) buf := make([]byte, 1024) @@ -265,185 +182,74 @@ func readAll(fileStore *storage.FileStore, hash []byte) (int64, error) { return total, nil } -func (r *TestRegistry) ReadAll(hash common.Hash) (int64, error) { - return readAll(r.fileStore, hash[:]) -} +func uploadFilesToNodes(sim *simulation.Simulation) ([]storage.Address, []string, error) { + nodes := sim.UpNodeIDs() + nodeCnt := len(nodes) + log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt)) + //array holding generated files + rfiles := make([]string, nodeCnt) + //array holding the root hashes of the files + rootAddrs := make([]storage.Address, nodeCnt) -func (r *TestRegistry) Start(server *p2p.Server) error { - return r.Registry.Start(server) -} - -func (r *TestRegistry) Stop() error { - return r.Registry.Stop() -} - -type TestExternalRegistry struct { - *Registry -} - -func (r *TestExternalRegistry) APIs() []rpc.API { - a := r.Registry.APIs() - a = append(a, rpc.API{ - Namespace: "stream", - Version: "3.0", - Service: r, - Public: true, - }) - return a -} - -func (r *TestExternalRegistry) GetHashes(ctx context.Context, peerId discover.NodeID, s Stream) (*rpc.Subscription, error) { - peer := r.getPeer(peerId) - - client, err := peer.getClient(ctx, s) - if err != nil { - return nil, err - } - - c := client.Client.(*testExternalClient) - - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, fmt.Errorf("Subscribe not supported") - } - - sub := notifier.CreateSubscription() - - go func() { - // if we begin sending event immediately some events - // will probably be dropped since the subscription ID might not be send to - // the client. - // ref: rpc/subscription_test.go#L65 - time.Sleep(1 * time.Second) - for { - select { - case h := <-c.hashes: - <-c.enableNotificationsC // wait for notification subscription to complete - if err := notifier.Notify(sub.ID, h); err != nil { - log.Warn(fmt.Sprintf("rpc sub notifier notify stream %s: %v", s, err)) - } - case err := <-sub.Err(): - if err != nil { - log.Warn(fmt.Sprintf("caught subscription error in stream %s: %v", s, err)) - } - case <-notifier.Closed(): - log.Trace(fmt.Sprintf("rpc sub notifier closed")) - return - } + var err error + //for every node, generate a file and upload + for i, id := range nodes { + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return nil, nil, fmt.Errorf("Error accessing localstore") } - }() - - return sub, nil + fileStore := item.(*storage.FileStore) + //generate a file + rfiles[i], err = generateRandomFile() + if err != nil { + return nil, nil, err + } + //store it (upload it) on the FileStore + ctx := context.TODO() + rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) + log.Debug("Uploaded random string file to node") + if err != nil { + return nil, nil, err + } + err = wait(ctx) + if err != nil { + return nil, nil, err + } + rootAddrs[i] = rk + } + return rootAddrs, rfiles, nil } -func (r *TestExternalRegistry) EnableNotifications(peerId discover.NodeID, s Stream) error { - peer := r.getPeer(peerId) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - client, err := peer.getClient(ctx, s) +//generate a random file (string) +func generateRandomFile() (string, error) { + //generate a random file size between minFileSize and maxFileSize + fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize + log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize)) + b := make([]byte, fileSize*1024) + _, err := crand.Read(b) if err != nil { - return err + log.Error("Error generating random file.", "err", err) + return "", err } - - close(client.Client.(*testExternalClient).enableNotificationsC) - - return nil + return string(b), nil } -// TODO: merge functionalities of testExternalClient and testExternalServer -// with testClient and testServer. - -type testExternalClient struct { - hashes chan []byte - db *storage.DBAPI - enableNotificationsC chan struct{} -} - -func newTestExternalClient(db *storage.DBAPI) *testExternalClient { - return &testExternalClient{ - hashes: make(chan []byte), - db: db, - enableNotificationsC: make(chan struct{}), - } -} - -func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { - chunk, _ := c.db.GetOrCreateRequest(ctx, hash) - if chunk.ReqC == nil { - return nil - } - c.hashes <- hash - return func() { - chunk.WaitToStore() - } -} - -func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { - return nil -} - -func (c *testExternalClient) Close() {} - -const testExternalServerBatchSize = 10 - -type testExternalServer struct { - t string - keyFunc func(key []byte, index uint64) - sessionAt uint64 - maxKeys uint64 - streamer *TestExternalRegistry -} - -func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer { - if keyFunc == nil { - keyFunc = binary.BigEndian.PutUint64 - } - return &testExternalServer{ - t: t, - keyFunc: keyFunc, - sessionAt: sessionAt, - maxKeys: maxKeys, - } -} - -func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - if from == 0 && to == 0 { - from = s.sessionAt - to = s.sessionAt + testExternalServerBatchSize - } - if to-from > testExternalServerBatchSize { - to = from + testExternalServerBatchSize - 1 - } - if from >= s.maxKeys && to > s.maxKeys { - return nil, 0, 0, nil, io.EOF - } - if to > s.maxKeys { - to = s.maxKeys - } - b := make([]byte, HashSize*(to-from+1)) - for i := from; i <= to; i++ { - s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) - } - return b, from, to, nil, nil -} - -func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { - return make([]byte, 4096), nil -} - -func (s *testExternalServer) Close() {} - -// Sets the global value defaultSkipCheck. -// It should be used in test function defer to reset the global value -// to the original value. -// -// defer setDefaultSkipCheck(defaultSkipCheck) -// defaultSkipCheck = skipCheck -// -// This works as defer function arguments evaluations are evaluated as ususal, -// but only the function body invocation is deferred. -func setDefaultSkipCheck(skipCheck bool) { - defaultSkipCheck = skipCheck +//create a local store for the given node +func createTestLocalStorageForID(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, string, error) { + var datadir string + var err error + datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString())) + if err != nil { + return nil, "", err + } + var store storage.ChunkStore + params := storage.NewDefaultLocalStoreParams() + params.ChunkDbPath = datadir + params.BaseKey = addr.Over() + store, err = storage.NewTestLocalStoreForAddr(params) + if err != nil { + os.RemoveAll(datadir) + return nil, "", err + } + return store, datadir, nil } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index f3da893a2..ae007e5b0 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -22,18 +22,19 @@ import ( crand "crypto/rand" "fmt" "io" + "os" "sync" "testing" "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -308,159 +309,164 @@ func TestDeliveryFromNodes(t *testing.T) { } func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool) { - defaultSkipCheck = skipCheck - toAddr = network.NewAddrFromNodeID - createStoreFunc = createTestLocalStorageFromSim - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - t.Fatal(err.Error()) - } - stores = make(map[discover.NodeID]storage.ChunkStore) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } - registries = make(map[discover.NodeID]*TestRegistry) - deliveries = make(map[discover.NodeID]*Delivery) - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } - - // here we distribute chunks of a random file into Stores of nodes 1 to nodes - rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) - size := chunkCount * chunkSize - ctx := context.TODO() - fileHash, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - // wait until all chunks stored - if err != nil { - t.Fatal(err.Error()) - } - err = wait(ctx) - if err != nil { - t.Fatal(err.Error()) - } - errc := make(chan error, 1) - waitPeerErrC = make(chan error) - quitC := make(chan struct{}) - defer close(quitC) - - action := func(ctx context.Context) error { - // each node Subscribes to each other's swarmChunkServerStreamName - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // using a global err channel to share betweem action and node service - i := 0 - for err := range waitPeerErrC { + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + return nil, nil, err } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + bucket.Store(bucketKeyRegistry, r) + + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) + } + netStore := storage.NewNetStore(localStore, retrieveFunc) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) + if err != nil { + t.Fatal(err) + } + + log.Info("Starting simulation") + ctx := context.Background() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + sim.SetPivotNode(nodeIDs[0]) + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) + i := 0 + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == *sim.PivotNodeID() { + continue + } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) i++ - if i == nodes { - break - } + } + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) + if err != nil { + return err } - // each node subscribes to the upstream swarm chunk server stream - // which responds to chunk retrieve requests all but the last node in the chain does not - for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - err := sim.CallClient(id, func(client *rpc.Client) error { - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - sid := sim.IDs[j+1] - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) - }) + //each of the nodes (except pivot node) subscribes to the stream of the next node + for j, node := range nodeIDs[0 : nodes-1] { + sid := nodeIDs[j+1] + item, ok := sim.NodeItem(node, bucketKeyRegistry) + if !ok { + return fmt.Errorf("No registry") + } + registry := item.(*Registry) + err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) if err != nil { return err } } - // create a retriever FileStore for the pivot node - delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) - fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + //get the pivot node's filestore + item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") go func() { // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks // we must wait for the peer connections to have started before requesting - n, err := readAll(fileStore, fileHash) + n, err := readAll(pivotFileStore, fileHash) log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) if err != nil { - errc <- fmt.Errorf("requesting chunks action error: %v", err) + t.Fatalf("requesting chunks action error: %v", err) } }() - return nil - } - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: + + log.Debug("Waiting for kademlia") + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err } + + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } + } + }() + + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true var total int64 - err := sim.CallClient(id, func(client *rpc.Client) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - return client.CallContext(ctx, &total, "stream_readAll", common.BytesToHash(fileHash)) - }) + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) if err != nil || total != int64(size) { - return false, nil + success = false } - return true, nil - } - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]), - // we are only testing the pivot node (net.Nodes[0]) - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, - }, - } - startedAt := time.Now() - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) - } + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + log.Debug("Test terminated successfully") + return nil + }) if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) + t.Fatal(result.Error) } - streamTesting.CheckResult(t, result, startedAt, finishedAt) } func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { @@ -490,218 +496,146 @@ func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { } func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skipCheck bool) { - defaultSkipCheck = skipCheck - toAddr = network.NewAddrFromNodeID - createStoreFunc = createTestLocalStorageFromSim - registries = make(map[discover.NodeID]*TestRegistry) + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - b.Fatal(err.Error()) - } - - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } - // wait channel for all nodes all peer connections to set up - waitPeerErrC = make(chan error) - - // create a FileStore for the last node in the chain which we are gonna write to - remoteFileStore := storage.NewFileStore(sim.Stores[nodes-1], storage.NewFileStoreParams()) - - // channel to signal simulation initialisation with action call complete - // or node disconnections - disconnectC := make(chan error) - quitC := make(chan struct{}) - - initC := make(chan error) - - action := func(ctx context.Context) error { - // each node Subscribes to each other's swarmChunkServerStreamName - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // waitPeerErrC using a global err channel to share betweem action and node service - i := 0 - for err := range waitPeerErrC { + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + return nil, nil, err } - i++ - if i == nodes { - break + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() } - } - var err error - // each node except the last one subscribes to the upstream swarm chunk server stream - // which responds to chunk retrieve requests - for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - err = sim.CallClient(id, func(client *rpc.Client) error { - doneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - sid := sim.IDs[j+1] // the upstream peer's id - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(swarmChunkServerStreamName, "", false), NewRange(0, 0), Top) + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + DoSync: true, + SyncUpdateDelay: 0, }) - if err != nil { - break + + retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { + return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) } - } - initC <- err - return nil - } + netStore := storage.NewNetStore(localStore, retrieveFunc) + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) - // the check function is only triggered when the benchmark finishes - trigger := make(chan discover.NodeID) - check := func(ctx context.Context, id discover.NodeID) (_ bool, err error) { - return true, nil - } + return r, cleanup, nil - conf.Step = &simulations.Step{ - Action: action, - Trigger: trigger, - // we are only testing the pivot node (net.Nodes[0]) - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, }, - } + }) + defer sim.Close() - // run the simulation in the background - errc := make(chan error) - go func() { - _, err := sim.Run(ctx, conf) - close(quitC) - errc <- err - }() - - // wait for simulation action to complete stream subscriptions - err = <-initC + log.Info("Initializing test config") + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - b.Fatalf("simulation failed to initialise. expected no error. got %v", err) + b.Fatal(err) } - // create a retriever FileStore for the pivot node - // by now deliveries are set for each node by the streamer service - delivery := deliveries[sim.IDs[0]] - retrieveFunc := func(ctx context.Context, chunk *storage.Chunk) error { - return delivery.RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - netStore := storage.NewNetStore(sim.Stores[0].(*storage.LocalStore), retrieveFunc) + ctx := context.Background() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + node := nodeIDs[len(nodeIDs)-1] - // benchmark loop - b.ResetTimer() - b.StopTimer() -Loop: - for i := 0; i < b.N; i++ { - // uploading chunkCount random chunks to the last node - hashes := make([]storage.Address, chunkCount) - for i := 0; i < chunkCount; i++ { - // create actual size real chunks - ctx := context.TODO() - hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) - if err != nil { - b.Fatalf("expected no error. got %v", err) - } - // wait until all chunks stored - err = wait(ctx) - if err != nil { - b.Fatalf("expected no error. got %v", err) - } - // collect the hashes - hashes[i] = hash + item, ok := sim.NodeItem(node, bucketKeyFileStore) + if !ok { + b.Fatal("No filestore") } - // now benchmark the actual retrieval - // netstore.Get is called for each hash in a go routine and errors are collected - b.StartTimer() - errs := make(chan error) - for _, hash := range hashes { - go func(h storage.Address) { - _, err := netStore.Get(ctx, h) - log.Warn("test check netstore get", "hash", h, "err", err) - errs <- err - }(hash) + remoteFileStore := item.(*storage.FileStore) + + pivotNode := nodeIDs[0] + item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) + if !ok { + b.Fatal("No filestore") } - // count and report retrieval errors - // if there are misses then chunk timeout is too low for the distance and volume (?) - var total, misses int - for err := range errs { - if err != nil { - log.Warn(err.Error()) - misses++ - } - total++ - if total == chunkCount { - break - } + netStore := item.(*storage.NetStore) + + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err } + + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + b.Fatal(d.Error) + } + } + }() + // benchmark loop + b.ResetTimer() b.StopTimer() + Loop: + for i := 0; i < b.N; i++ { + // uploading chunkCount random chunks to the last node + hashes := make([]storage.Address, chunkCount) + for i := 0; i < chunkCount; i++ { + // create actual size real chunks + ctx := context.TODO() + hash, wait, err := remoteFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(chunkSize)), int64(chunkSize), false) + if err != nil { + b.Fatalf("expected no error. got %v", err) + } + // wait until all chunks stored + err = wait(ctx) + if err != nil { + b.Fatalf("expected no error. got %v", err) + } + // collect the hashes + hashes[i] = hash + } + // now benchmark the actual retrieval + // netstore.Get is called for each hash in a go routine and errors are collected + b.StartTimer() + errs := make(chan error) + for _, hash := range hashes { + go func(h storage.Address) { + _, err := netStore.Get(ctx, h) + log.Warn("test check netstore get", "hash", h, "err", err) + errs <- err + }(hash) + } + // count and report retrieval errors + // if there are misses then chunk timeout is too low for the distance and volume (?) + var total, misses int + for err := range errs { + if err != nil { + log.Warn(err.Error()) + misses++ + } + total++ + if total == chunkCount { + break + } + } + b.StopTimer() - select { - case err = <-disconnectC: - if err != nil { + if misses > 0 { + err = fmt.Errorf("%v chunk not found out of %v", misses, total) break Loop } - default: } - - if misses > 0 { - err = fmt.Errorf("%v chunk not found out of %v", misses, total) - break Loop + if err != nil { + b.Fatal(err) } + return nil + }) + if result.Error != nil { + b.Fatal(result.Error) } - select { - case <-quitC: - case trigger <- sim.IDs[0]: - } - if err == nil { - err = <-errc - } else { - if e := <-errc; e != nil { - b.Errorf("sim.Run function error: %v", e) - } - } - - // benchmark over, trigger the check function to conclude the simulation - if err != nil { - b.Fatalf("expected no error. got %v", err) - } -} - -func createTestLocalStorageFromSim(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) { - return stores[id], nil } diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index d996cdc7e..f4294134b 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -22,52 +22,22 @@ import ( "encoding/binary" "fmt" "io" + "os" "sync" "testing" "time" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) -var ( - externalStreamName = "externalStream" - externalStreamSessionAt uint64 = 50 - externalStreamMaxKeys uint64 = 100 -) - -func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { - id := ctx.Config.ID - addr := toAddr(id) - kad := network.NewKademlia(addr.Over(), network.NewKadParams()) - store := stores[id].(*storage.LocalStore) - db := storage.NewDBAPI(store) - delivery := NewDelivery(kad, db) - deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: defaultSkipCheck, - }) - - r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { - return newTestExternalClient(db), nil - }) - r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { - return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil - }) - - go func() { - waitPeerErrC <- waitForPeers(r, 1*time.Second, peerCount(id)) - }() - return &TestExternalRegistry{r}, nil -} - func TestIntervals(t *testing.T) { testIntervals(t, true, nil, false) testIntervals(t, false, NewRange(9, 26), false) @@ -81,237 +51,337 @@ func TestIntervals(t *testing.T) { func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { nodes := 2 chunkCount := dataChunkCount + externalStreamName := "externalStream" + externalStreamSessionAt := uint64(50) + externalStreamMaxKeys := uint64(100) - defer setDefaultSkipCheck(defaultSkipCheck) - defaultSkipCheck = skipCheck + sim := simulation.New(map[string]simulation.ServiceFunc{ + "intervalsStreamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { - toAddr = network.NewAddrFromNodeID - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: 1, - ToAddr: toAddr, - Services: services, - DefaultService: "intervalsStreamer", - } - - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() - if err != nil { - t.Fatal(err) - } - - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) - for i, id := range sim.IDs { - stores[id] = sim.Stores[i] - } - - peerCount = func(id discover.NodeID) int { - return 1 - } - - fileStore := storage.NewFileStore(sim.Stores[0], storage.NewFileStoreParams()) - size := chunkCount * chunkSize - ctx := context.TODO() - _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - if err != nil { - t.Fatal(err) - } - err = wait(ctx) - if err != nil { - t.Fatal(err) - } - - errc := make(chan error, 1) - waitPeerErrC = make(chan error) - quitC := make(chan struct{}) - defer close(quitC) - - action := func(ctx context.Context) error { - i := 0 - for err := range waitPeerErrC { + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + return nil, nil, err } - i++ - if i == nodes { - break + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + bucket.Store(bucketKeyRegistry, r) + + r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) { + return newTestExternalClient(db), nil + }) + r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { + return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil + }) + + fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + storer := nodeIDs[0] + checker := nodeIDs[1] + + item, ok := sim.NodeItem(storer, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + + size := chunkCount * chunkSize + _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + log.Error("Store error: %v", "err", err) + t.Fatal(err) + } + err = wait(ctx) + if err != nil { + log.Error("Wait error: %v", "err", err) + t.Fatal(err) } - id := sim.IDs[1] + item, ok = sim.NodeItem(checker, bucketKeyRegistry) + if !ok { + return fmt.Errorf("No registry") + } + registry := item.(*Registry) - err := sim.CallClient(id, func(client *rpc.Client) error { + liveErrC := make(chan error) + historyErrC := make(chan error) - sid := sim.IDs[0] + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + log.Error("WaitKademlia error: %v", "err", err) + return err + } - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) + log.Debug("Watching for disconnections") + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) + + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } + } + }() + + go func() { + if !live { + close(liveErrC) + return + } + + var err error + defer func() { + liveErrC <- err + }() + + // live stream + var liveHashesChan chan []byte + liveHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - return err + log.Error("Subscription error: %v", "err", err) + return } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 100*time.Second) - defer cancel() + i := externalStreamSessionAt - err = client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream(externalStreamName, "", live), history, Top) + // we have subscribed, enable notifications + err = enableNotifications(registry, storer, NewStream(externalStreamName, "", true)) if err != nil { - return err + return } - liveErrC := make(chan error) - historyErrC := make(chan error) - - go func() { - if !live { - close(liveErrC) - return - } - - var err error - defer func() { - liveErrC <- err - }() - - // live stream - liveHashesChan := make(chan []byte) - liveSubscription, err := client.Subscribe(ctx, "stream", liveHashesChan, "getHashes", sid, NewStream(externalStreamName, "", true)) - if err != nil { - return - } - defer liveSubscription.Unsubscribe() - - i := externalStreamSessionAt - - // we have subscribed, enable notifications - err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", true)) - if err != nil { - return - } - - for { - select { - case hash := <-liveHashesChan: - h := binary.BigEndian.Uint64(hash) - if h != i { - err = fmt.Errorf("expected live hash %d, got %d", i, h) - return - } - i++ - if i > externalStreamMaxKeys { - return - } - case err = <-liveSubscription.Err(): - return - case <-ctx.Done(): + for { + select { + case hash := <-liveHashesChan: + h := binary.BigEndian.Uint64(hash) + if h != i { + err = fmt.Errorf("expected live hash %d, got %d", i, h) return } + i++ + if i > externalStreamMaxKeys { + return + } + case <-ctx.Done(): + return } + } + }() + + go func() { + if live && history == nil { + close(historyErrC) + return + } + + var err error + defer func() { + historyErrC <- err }() - go func() { - if live && history == nil { - close(historyErrC) - return - } - - var err error - defer func() { - historyErrC <- err - }() - - // history stream - historyHashesChan := make(chan []byte) - historySubscription, err := client.Subscribe(ctx, "stream", historyHashesChan, "getHashes", sid, NewStream(externalStreamName, "", false)) - if err != nil { - return - } - defer historySubscription.Unsubscribe() - - var i uint64 - historyTo := externalStreamMaxKeys - if history != nil { - i = history.From - if history.To != 0 { - historyTo = history.To - } - } - - // we have subscribed, enable notifications - err = client.CallContext(ctx, nil, "stream_enableNotifications", sid, NewStream(externalStreamName, "", false)) - if err != nil { - return - } - - for { - select { - case hash := <-historyHashesChan: - h := binary.BigEndian.Uint64(hash) - if h != i { - err = fmt.Errorf("expected history hash %d, got %d", i, h) - return - } - i++ - if i > historyTo { - return - } - case err = <-historySubscription.Err(): - return - case <-ctx.Done(): - return - } - } - }() - - if err := <-liveErrC; err != nil { - return err - } - if err := <-historyErrC; err != nil { - return err + // history stream + var historyHashesChan chan []byte + historyHashesChan, err = getHashes(ctx, registry, storer, NewStream(externalStreamName, "", false)) + if err != nil { + return } - return nil - }) + var i uint64 + historyTo := externalStreamMaxKeys + if history != nil { + i = history.From + if history.To != 0 { + historyTo = history.To + } + } + + // we have subscribed, enable notifications + err = enableNotifications(registry, storer, NewStream(externalStreamName, "", false)) + if err != nil { + return + } + + for { + select { + case hash := <-historyHashesChan: + h := binary.BigEndian.Uint64(hash) + if h != i { + err = fmt.Errorf("expected history hash %d, got %d", i, h) + return + } + i++ + if i > historyTo { + return + } + case <-ctx.Done(): + return + } + } + }() + + err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) + if err != nil { + return err + } + if err := <-liveErrC; err != nil { + return err + } + if err := <-historyErrC; err != nil { + return err + } + + return nil + }) + + if result.Error != nil { + t.Fatal(result.Error) + } +} + +func getHashes(ctx context.Context, r *Registry, peerID discover.NodeID, s Stream) (chan []byte, error) { + peer := r.getPeer(peerID) + + client, err := peer.getClient(ctx, s) + if err != nil { + return nil, err + } + + c := client.Client.(*testExternalClient) + + return c.hashes, nil +} + +func enableNotifications(r *Registry, peerID discover.NodeID, s Stream) error { + peer := r.getPeer(peerID) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, err := peer.getClient(ctx, s) + if err != nil { return err } - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: - } - return true, nil - } - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(10*time.Millisecond, quitC, sim.IDs[0]), - Expect: &simulations.Expectation{ - Nodes: sim.IDs[1:1], - Check: check, - }, - } - startedAt := time.Now() - timeout := 300 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) - } - if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) - } - streamTesting.CheckResult(t, result, startedAt, finishedAt) + close(client.Client.(*testExternalClient).enableNotificationsC) + + return nil } + +type testExternalClient struct { + hashes chan []byte + db *storage.DBAPI + enableNotificationsC chan struct{} +} + +func newTestExternalClient(db *storage.DBAPI) *testExternalClient { + return &testExternalClient{ + hashes: make(chan []byte), + db: db, + enableNotificationsC: make(chan struct{}), + } +} + +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func() { + chunk, _ := c.db.GetOrCreateRequest(ctx, hash) + if chunk.ReqC == nil { + return nil + } + c.hashes <- hash + //NOTE: This was failing on go1.9.x with a deadlock. + //Sometimes this function would just block + //It is commented now, but it may be well worth after the chunk refactor + //to re-enable this and see if the problem has been addressed + /* + return func() { + return chunk.WaitToStore() + } + */ + return nil +} + +func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { + return nil +} + +func (c *testExternalClient) Close() {} + +const testExternalServerBatchSize = 10 + +type testExternalServer struct { + t string + keyFunc func(key []byte, index uint64) + sessionAt uint64 + maxKeys uint64 +} + +func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer { + if keyFunc == nil { + keyFunc = binary.BigEndian.PutUint64 + } + return &testExternalServer{ + t: t, + keyFunc: keyFunc, + sessionAt: sessionAt, + maxKeys: maxKeys, + } +} + +func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + if from == 0 && to == 0 { + from = s.sessionAt + to = s.sessionAt + testExternalServerBatchSize + } + if to-from > testExternalServerBatchSize { + to = from + testExternalServerBatchSize - 1 + } + if from >= s.maxKeys && to > s.maxKeys { + return nil, 0, 0, nil, io.EOF + } + if to > s.maxKeys { + to = s.maxKeys + } + b := make([]byte, HashSize*(to-from+1)) + for i := from; i <= to; i++ { + s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) + } + return b, from, to, nil, nil +} + +func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { + return make([]byte, 4096), nil +} + +func (s *testExternalServer) Close() {} diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 9961a0bc7..4ff947b21 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -17,20 +17,19 @@ package stream import ( "context" - crand "crypto/rand" "fmt" - "math/rand" - "strings" + "os" "sync" "testing" "time" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -40,40 +39,6 @@ const ( maxFileSize = 40 ) -func initRetrievalTest() { - //global func to get overlay address from discover ID - toAddr = func(id discover.NodeID) *network.BzzAddr { - addr := network.NewAddrFromNodeID(id) - return addr - } - //global func to create local store - createStoreFunc = createTestLocalStorageForId - //local stores - stores = make(map[discover.NodeID]storage.ChunkStore) - //data directories for each node and store - datadirs = make(map[discover.NodeID]string) - //deliveries for each node - deliveries = make(map[discover.NodeID]*Delivery) - //global retrieve func - getRetrieveFunc = func(id discover.NodeID) func(ctx context.Context, chunk *storage.Chunk) error { - return func(ctx context.Context, chunk *storage.Chunk) error { - skipCheck := true - return deliveries[id].RequestFromPeers(ctx, chunk.Addr[:], skipCheck) - } - } - //registries, map of discover.NodeID to its streamer - registries = make(map[discover.NodeID]*TestRegistry) - //not needed for this test but required from common_test for NewStreamService - waitPeerErrC = make(chan error) - //also not needed for this test but required for NewStreamService - peerCount = func(id discover.NodeID) int { - if ids[0] == id || ids[len(ids)-1] == id { - return 1 - } - return 2 - } -} - //This test is a retrieval test for nodes. //A configurable number of nodes can be //provided to the test. @@ -81,7 +46,10 @@ func initRetrievalTest() { //Number of nodes can be provided via commandline too. func TestFileRetrieval(t *testing.T) { if *nodes != 0 { - fileRetrievalTest(t, *nodes) + err := runFileRetrievalTest(*nodes) + if err != nil { + t.Fatal(err) + } } else { nodeCnt := []int{16} //if the `longrunning` flag has been provided @@ -90,7 +58,10 @@ func TestFileRetrieval(t *testing.T) { nodeCnt = append(nodeCnt, 32, 64, 128) } for _, n := range nodeCnt { - fileRetrievalTest(t, n) + err := runFileRetrievalTest(n) + if err != nil { + t.Fatal(err) + } } } } @@ -105,7 +76,10 @@ func TestRetrieval(t *testing.T) { //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { - retrievalTest(t, *chunks, *nodes) + err := runRetrievalTest(*chunks, *nodes) + if err != nil { + t.Fatal(err) + } } else { var nodeCnt []int var chnkCnt []int @@ -121,76 +95,17 @@ func TestRetrieval(t *testing.T) { } for _, n := range nodeCnt { for _, c := range chnkCnt { - retrievalTest(t, c, n) + err := runRetrievalTest(c, n) + if err != nil { + t.Fatal(err) + } } } } } -//Every test runs 3 times, a live, a history, and a live AND history -func fileRetrievalTest(t *testing.T, nodeCount int) { - //test live and NO history - log.Info("Testing live and no history", "nodeCount", nodeCount) - live = true - history = false - err := runFileRetrievalTest(nodeCount) - if err != nil { - t.Fatal(err) - } - //test history only - log.Info("Testing history only", "nodeCount", nodeCount) - live = false - history = true - err = runFileRetrievalTest(nodeCount) - if err != nil { - t.Fatal(err) - } - //finally test live and history - log.Info("Testing live and history", "nodeCount", nodeCount) - live = true - err = runFileRetrievalTest(nodeCount) - if err != nil { - t.Fatal(err) - } -} - -//Every test runs 3 times, a live, a history, and a live AND history -func retrievalTest(t *testing.T, chunkCount int, nodeCount int) { - //test live and NO history - log.Info("Testing live and no history", "chunkCount", chunkCount, "nodeCount", nodeCount) - live = true - history = false - err := runRetrievalTest(chunkCount, nodeCount) - if err != nil { - t.Fatal(err) - } - //test history only - log.Info("Testing history only", "chunkCount", chunkCount, "nodeCount", nodeCount) - live = false - history = true - err = runRetrievalTest(chunkCount, nodeCount) - if err != nil { - t.Fatal(err) - } - //finally test live and history - log.Info("Testing live and history", "chunkCount", chunkCount, "nodeCount", nodeCount) - live = true - err = runRetrievalTest(chunkCount, nodeCount) - if err != nil { - t.Fatal(err) - } -} - /* -The upload is done by dependency to the global -`live` and `history` variables; - -If `live` is set, first stream subscriptions are established, -then files are uploaded to nodes. - -If `history` is enabled, first upload files, then build up subscriptions. - The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. Nevertheless a health check runs in the @@ -199,261 +114,129 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ func runFileRetrievalTest(nodeCount int) error { - //for every run (live, history), int the variables - initRetrievalTest() - //the ids of the snapshot nodes, initiate only now as we need nodeCount - ids = make([]discover.NodeID, nodeCount) - //channel to check for disconnection errors - disconnectC := make(chan error) - //channel to close disconnection watcher routine - quitC := make(chan struct{}) - //the test conf (using same as in `snapshot_sync_test` - conf = &synctestConfig{} + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 3 * time.Second, + }) + + fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Initializing test config") + + conf := &synctestConfig{} + //map of discover ID to indexes of chunks expected at that ID + conf.idToChunksMap = make(map[discover.NodeID][]int) //map of overlay address to discover ID - conf.addrToIdMap = make(map[string]discover.NodeID) + conf.addrToIDMap = make(map[string]discover.NodeID) //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - //load nodes from the snapshot file - net, err := initNetWithSnapshot(nodeCount) + + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) if err != nil { return err } - var rpcSubscriptionsWg sync.WaitGroup - //do cleanup after test is terminated - defer func() { - //shutdown the snapshot network - net.Shutdown() - //after the test, clean up local stores initialized with createLocalStoreForId - localStoreCleanup() - //finally clear all data directories - datadirsCleanup() - }() - //get the nodes of the network - nodes := net.GetNodes() - //iterate over all nodes... - for c := 0; c < len(nodes); c++ { - //create an array of discovery nodeIDS - ids[c] = nodes[c].ID() - a := network.ToOverlayAddr(ids[c].Bytes()) - //append it to the array of all overlay addresses - conf.addrs = append(conf.addrs, a) - conf.addrToIdMap[string(a)] = ids[c] - } - //needed for healthy call - ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs) + ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelSimRun() - //an array for the random files - var randomFiles []string - //channel to signal when the upload has finished - uploadFinished := make(chan struct{}) - //channel to trigger new node checks - trigger := make(chan discover.NodeID) - //simulation action - action := func(ctx context.Context) error { - //first run the health check on all nodes, - //wait until nodes are all healthy - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - for range ticker.C { - healthy := true - for _, id := range ids { - r := registries[id] - //PeerPot for this node - addr := common.Bytes2Hex(r.addr.OAddr) - pp := ppmap[addr] - //call Healthy RPC - h := r.delivery.overlay.Healthy(pp) - //print info - log.Debug(r.delivery.overlay.String()) - log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full)) - if !h.GotNN || !h.Full { - healthy = false - break + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + for _, n := range nodeIDs { + //get the kademlia overlay address from this ID + a := network.ToOverlayAddr(n.Bytes()) + //append it to the array of all overlay addresses + conf.addrs = append(conf.addrs, a) + //the proximity calculation is on overlay addr, + //the p2p/simulations check func triggers on discover.NodeID, + //so we need to know which overlay addr maps to which nodeID + conf.addrToIDMap[string(a)] = n + } + + //an array for the random files + var randomFiles []string + //channel to signal when the upload has finished + //uploadFinished := make(chan struct{}) + //channel to trigger new node checks + + conf.hashes, randomFiles, err = uploadFilesToNodes(sim) + if err != nil { + return err + } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + allSuccess := false + for !allSuccess { + for _, id := range nodeIDs { + //for each expected chunk, check if it is in the local store + localChunks := conf.idToChunksMap[id] + localSuccess := true + for _, ch := range localChunks { + //get the real chunk by the index in the index array + chunk := conf.hashes[ch] + log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) + //check if the expected chunk is indeed in the localstore + var err error + //check on the node's FileStore (netstore) + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No registry") + } + fileStore := item.(*storage.FileStore) + //check all chunks + for i, hash := range conf.hashes { + reader, _ := fileStore.Retrieve(context.TODO(), hash) + //check that we can read the file size and that it corresponds to the generated file size + if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) { + allSuccess = false + log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) + } else { + log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) + } + } + if err != nil { + log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) + localSuccess = false + } else { + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + } } - } - if healthy { - break + allSuccess = localSuccess } } - - if history { - log.Info("Uploading for history") - //If testing only history, we upload the chunk(s) first - conf.hashes, randomFiles, err = uploadFilesToNodes(nodes) - if err != nil { - return err - } + if !allSuccess { + return fmt.Errorf("Not all chunks succeeded!") } - - //variables needed to wait for all subscriptions established before uploading - errc := make(chan error) - - //now setup and start event watching in order to know when we can upload - ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second) - defer watchCancel() - - log.Info("Setting up stream subscription") - //We need two iterations, one to subscribe to the subscription events - //(so we know when setup phase is finished), and one to - //actually run the stream subscriptions. We can't do it in the same iteration, - //because while the first nodes in the loop are setting up subscriptions, - //the latter ones have not subscribed to listen to peer events yet, - //and then we miss events. - - //first iteration: setup disconnection watcher and subscribe to peer events - for j, id := range ids { - log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } - wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC) - // doneC is nil, the error happened which is sent to errc channel, already - if wsDoneC == nil { - continue - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wsDoneC - rpcSubscriptionsWg.Done() - }() - - //watch for peers disconnecting - wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wdDoneC - rpcSubscriptionsWg.Done() - }() - } - - //second iteration: start syncing and setup stream subscriptions - for j, id := range ids { - log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } - //start syncing! - var cnt int - err = client.CallContext(ctx, &cnt, "stream_startSyncing") - if err != nil { - return err - } - //increment the number of subscriptions we need to wait for - //by the count returned from startSyncing (SYNC subscriptions) - subscriptionCount += cnt - //now also add the number of RETRIEVAL_REQUEST subscriptions - for snid := range registries[id].peers { - subscriptionCount++ - err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top) - if err != nil { - return err - } - } - } - - //now wait until the number of expected subscriptions has been finished - //`watchSubscriptionEvents` will write with a `nil` value to errc - //every time a `SubscriptionMsg` has been received - for err := range errc { - if err != nil { - return err - } - //`nil` received, decrement count - subscriptionCount-- - //all subscriptions received - if subscriptionCount == 0 { - break - } - } - - log.Info("Stream subscriptions successfully requested, action terminated") - - if live { - //upload generated files to nodes - var hashes []storage.Address - var rfiles []string - hashes, rfiles, err = uploadFilesToNodes(nodes) - if err != nil { - return err - } - conf.hashes = append(conf.hashes, hashes...) - randomFiles = append(randomFiles, rfiles...) - //signal to the trigger loop that the upload has finished - uploadFinished <- struct{}{} - } - return nil - } - - //check defines what will be checked during the test - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - - select { - case <-ctx.Done(): - return false, ctx.Err() - case e := <-disconnectC: - log.Error(e.Error()) - return false, fmt.Errorf("Disconnect event detected, network unhealthy") - default: - } - log.Trace(fmt.Sprintf("Checking node: %s", id)) - //if there are more than one chunk, test only succeeds if all expected chunks are found - allSuccess := true - - //check on the node's FileStore (netstore) - fileStore := registries[id].fileStore - //check all chunks - for i, hash := range conf.hashes { - reader, _ := fileStore.Retrieve(context.TODO(), hash) - //check that we can read the file size and that it corresponds to the generated file size - if s, err := reader.Size(context.TODO(), nil); err != nil || s != int64(len(randomFiles[i])) { - allSuccess = false - log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) - } else { - log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) - } - } - - return allSuccess, nil - } - - //for each tick, run the checks on all nodes - timingTicker := time.NewTicker(5 * time.Second) - defer timingTicker.Stop() - go func() { - //for live upload, we should wait for uploads to have finished - //before starting to trigger the checks, due to file size - if live { - <-uploadFinished - } - for range timingTicker.C { - for i := 0; i < len(ids); i++ { - log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i])) - trigger <- ids[i] - } - } - }() - - log.Info("Starting simulation run...") - - timeout := MaxTimeout * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - //run the simulation - result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, }) if result.Error != nil { @@ -466,14 +249,6 @@ func runFileRetrievalTest(nodeCount int) error { /* The test generates the given number of chunks. -The upload is done by dependency to the global -`live` and `history` variables; - -If `live` is set, first stream subscriptions are established, then -upload to a random node. - -If `history` is enabled, first upload then build up subscriptions. - The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. Nevertheless a health check runs in the @@ -482,259 +257,129 @@ simulation's `action` function. The snapshot should have 'streamer' in its service list. */ func runRetrievalTest(chunkCount int, nodeCount int) error { - //for every run (live, history), int the variables - initRetrievalTest() - //the ids of the snapshot nodes, initiate only now as we need nodeCount - ids = make([]discover.NodeID, nodeCount) - //channel to check for disconnection errors - disconnectC := make(chan error) - //channel to close disconnection watcher routine - quitC := make(chan struct{}) - //the test conf (using same as in `snapshot_sync_test` - conf = &synctestConfig{} + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 0, + }) + + fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + bucketKeyFileStore = simulation.BucketKey("filestore") + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + conf := &synctestConfig{} + //map of discover ID to indexes of chunks expected at that ID + conf.idToChunksMap = make(map[discover.NodeID][]int) //map of overlay address to discover ID - conf.addrToIdMap = make(map[string]discover.NodeID) + conf.addrToIDMap = make(map[string]discover.NodeID) //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - //load nodes from the snapshot file - net, err := initNetWithSnapshot(nodeCount) + + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) if err != nil { return err } - var rpcSubscriptionsWg sync.WaitGroup - //do cleanup after test is terminated - defer func() { - //shutdown the snapshot network - net.Shutdown() - //after the test, clean up local stores initialized with createLocalStoreForId - localStoreCleanup() - //finally clear all data directories - datadirsCleanup() - }() - //get the nodes of the network - nodes := net.GetNodes() - //select one index at random... - idx := rand.Intn(len(nodes)) - //...and get the the node at that index - //this is the node selected for upload - uploadNode := nodes[idx] - //iterate over all nodes... - for c := 0; c < len(nodes); c++ { - //create an array of discovery nodeIDS - ids[c] = nodes[c].ID() - a := network.ToOverlayAddr(ids[c].Bytes()) - //append it to the array of all overlay addresses - conf.addrs = append(conf.addrs, a) - conf.addrToIdMap[string(a)] = ids[c] - } - //needed for healthy call - ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs) + ctx := context.Background() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + for _, n := range nodeIDs { + //get the kademlia overlay address from this ID + a := network.ToOverlayAddr(n.Bytes()) + //append it to the array of all overlay addresses + conf.addrs = append(conf.addrs, a) + //the proximity calculation is on overlay addr, + //the p2p/simulations check func triggers on discover.NodeID, + //so we need to know which overlay addr maps to which nodeID + conf.addrToIDMap[string(a)] = n + } - trigger := make(chan discover.NodeID) - //simulation action - action := func(ctx context.Context) error { - //first run the health check on all nodes, - //wait until nodes are all healthy - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - for range ticker.C { - healthy := true - for _, id := range ids { - r := registries[id] - //PeerPot for this node - addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes())) - pp := ppmap[addr] - //call Healthy RPC - h := r.delivery.overlay.Healthy(pp) - //print info - log.Debug(r.delivery.overlay.String()) - log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full)) - if !h.GotNN || !h.Full { - healthy = false - break + //an array for the random files + var randomFiles []string + //this is the node selected for upload + node := sim.RandomUpNode() + item, ok := sim.NodeItem(node.ID, bucketKeyStore) + if !ok { + return fmt.Errorf("No localstore") + } + lstore := item.(*storage.LocalStore) + conf.hashes, err = uploadFileToSingleNodeStore(node.ID, chunkCount, lstore) + if err != nil { + return err + } + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + allSuccess := false + for !allSuccess { + for _, id := range nodeIDs { + //for each expected chunk, check if it is in the local store + localChunks := conf.idToChunksMap[id] + localSuccess := true + for _, ch := range localChunks { + //get the real chunk by the index in the index array + chunk := conf.hashes[ch] + log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) + //check if the expected chunk is indeed in the localstore + var err error + //check on the node's FileStore (netstore) + item, ok := sim.NodeItem(id, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No registry") + } + fileStore := item.(*storage.FileStore) + //check all chunks + for i, hash := range conf.hashes { + reader, _ := fileStore.Retrieve(context.TODO(), hash) + //check that we can read the file size and that it corresponds to the generated file size + if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) { + allSuccess = false + log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id) + } else { + log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash)) + } + } + if err != nil { + log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) + localSuccess = false + } else { + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + } } - } - if healthy { - break + allSuccess = localSuccess } } - - if history { - log.Info("Uploading for history") - //If testing only history, we upload the chunk(s) first - conf.hashes, err = uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount) - if err != nil { - return err - } + if !allSuccess { + return fmt.Errorf("Not all chunks succeeded!") } - - //variables needed to wait for all subscriptions established before uploading - errc := make(chan error) - - //now setup and start event watching in order to know when we can upload - ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second) - defer watchCancel() - - log.Info("Setting up stream subscription") - //We need two iterations, one to subscribe to the subscription events - //(so we know when setup phase is finished), and one to - //actually run the stream subscriptions. We can't do it in the same iteration, - //because while the first nodes in the loop are setting up subscriptions, - //the latter ones have not subscribed to listen to peer events yet, - //and then we miss events. - - //first iteration: setup disconnection watcher and subscribe to peer events - for j, id := range ids { - log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } - - //check for `SubscribeMsg` events to know when setup phase is complete - wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC) - // doneC is nil, the error happened which is sent to errc channel, already - if wsDoneC == nil { - continue - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wsDoneC - rpcSubscriptionsWg.Done() - }() - - //watch for peers disconnecting - wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wdDoneC - rpcSubscriptionsWg.Done() - }() - } - - //second iteration: start syncing and setup stream subscriptions - for j, id := range ids { - log.Trace(fmt.Sprintf("Start syncing and stream subscriptions: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } - //start syncing! - var cnt int - err = client.CallContext(ctx, &cnt, "stream_startSyncing") - if err != nil { - return err - } - //increment the number of subscriptions we need to wait for - //by the count returned from startSyncing (SYNC subscriptions) - subscriptionCount += cnt - //now also add the number of RETRIEVAL_REQUEST subscriptions - for snid := range registries[id].peers { - subscriptionCount++ - err = client.CallContext(ctx, nil, "stream_subscribeStream", snid, NewStream(swarmChunkServerStreamName, "", false), nil, Top) - if err != nil { - return err - } - } - } - - //now wait until the number of expected subscriptions has been finished - //`watchSubscriptionEvents` will write with a `nil` value to errc - //every time a `SubscriptionMsg` has been received - for err := range errc { - if err != nil { - return err - } - //`nil` received, decrement count - subscriptionCount-- - //all subscriptions received - if subscriptionCount == 0 { - break - } - } - - log.Info("Stream subscriptions successfully requested, action terminated") - - if live { - //now upload the chunks to the selected random single node - chnks, err := uploadFileToSingleNodeStore(uploadNode.ID(), chunkCount) - if err != nil { - return err - } - conf.hashes = append(conf.hashes, chnks...) - } - return nil - } - - chunkSize := storage.DefaultChunkSize - - //check defines what will be checked during the test - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - - //don't check the uploader node - if id == uploadNode.ID() { - return true, nil - } - - select { - case <-ctx.Done(): - return false, ctx.Err() - case e := <-disconnectC: - log.Error(e.Error()) - return false, fmt.Errorf("Disconnect event detected, network unhealthy") - default: - } - log.Trace(fmt.Sprintf("Checking node: %s", id)) - //if there are more than one chunk, test only succeeds if all expected chunks are found - allSuccess := true - - //check on the node's FileStore (netstore) - fileStore := registries[id].fileStore - //check all chunks - for _, chnk := range conf.hashes { - reader, _ := fileStore.Retrieve(context.TODO(), chnk) - //assuming that reading the Size of the chunk is enough to know we found it - if s, err := reader.Size(context.TODO(), nil); err != nil || s != chunkSize { - allSuccess = false - log.Warn("Retrieve error", "err", err, "chunk", chnk, "nodeId", id) - } else { - log.Debug(fmt.Sprintf("Chunk %x found", chnk)) - } - } - return allSuccess, nil - } - - //for each tick, run the checks on all nodes - timingTicker := time.NewTicker(5 * time.Second) - defer timingTicker.Stop() - go func() { - for range timingTicker.C { - for i := 0; i < len(ids); i++ { - log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i])) - trigger <- ids[i] - } - } - }() - - log.Info("Starting simulation run...") - - timeout := MaxTimeout * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - //run the simulation - result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, }) if result.Error != nil { @@ -743,53 +388,3 @@ func runRetrievalTest(chunkCount int, nodeCount int) error { return nil } - -//upload generated files to nodes -//every node gets one file uploaded -func uploadFilesToNodes(nodes []*simulations.Node) ([]storage.Address, []string, error) { - nodeCnt := len(nodes) - log.Debug(fmt.Sprintf("Uploading %d files to nodes", nodeCnt)) - //array holding generated files - rfiles := make([]string, nodeCnt) - //array holding the root hashes of the files - rootAddrs := make([]storage.Address, nodeCnt) - - var err error - //for every node, generate a file and upload - for i, n := range nodes { - id := n.ID() - fileStore := registries[id].fileStore - //generate a file - rfiles[i], err = generateRandomFile() - if err != nil { - return nil, nil, err - } - //store it (upload it) on the FileStore - ctx := context.TODO() - rk, wait, err := fileStore.Store(ctx, strings.NewReader(rfiles[i]), int64(len(rfiles[i])), false) - log.Debug("Uploaded random string file to node") - if err != nil { - return nil, nil, err - } - err = wait(ctx) - if err != nil { - return nil, nil, err - } - rootAddrs[i] = rk - } - return rootAddrs, rfiles, nil -} - -//generate a random file (string) -func generateRandomFile() (string, error) { - //generate a random file size between minFileSize and maxFileSize - fileSize := rand.Intn(maxFileSize-minFileSize) + minFileSize - log.Debug(fmt.Sprintf("Generated file with filesize %d kB", fileSize)) - b := make([]byte, fileSize*1024) - _, err := crand.Read(b) - if err != nil { - log.Error("Error generating random file.", "err", err) - return "", err - } - return string(b), nil -} diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 0b5257c60..2dfc5898f 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -18,12 +18,8 @@ package stream import ( "context" crand "crypto/rand" - "encoding/json" - "flag" "fmt" "io" - "io/ioutil" - "math/rand" "os" "sync" "testing" @@ -31,82 +27,27 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/pot" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) const testMinProxBinSize = 2 const MaxTimeout = 600 -var ( - pof = pot.DefaultPof(256) - - conf *synctestConfig - ids []discover.NodeID - datadirs map[discover.NodeID]string - ppmap map[string]*network.PeerPot - - live bool - history bool - - longrunning = flag.Bool("longrunning", false, "do run long-running tests") -) - type synctestConfig struct { addrs [][]byte hashes []storage.Address idToChunksMap map[discover.NodeID][]int chunksToNodesMap map[string][]int - addrToIdMap map[string]discover.NodeID -} - -func init() { - rand.Seed(time.Now().Unix()) -} - -//common_test needs to initialize the test in a init() func -//in order for adapters to register the NewStreamerService; -//this service is dependent on some global variables -//we thus need to initialize first as init() as well. -func initSyncTest() { - //assign the toAddr func so NewStreamerService can build the addr - toAddr = func(id discover.NodeID) *network.BzzAddr { - addr := network.NewAddrFromNodeID(id) - return addr - } - //global func to create local store - if *useMockStore { - createStoreFunc = createMockStore - } else { - createStoreFunc = createTestLocalStorageForId - } - //local stores - stores = make(map[discover.NodeID]storage.ChunkStore) - //data directories for each node and store - datadirs = make(map[discover.NodeID]string) - //deliveries for each node - deliveries = make(map[discover.NodeID]*Delivery) - //registries, map of discover.NodeID to its streamer - registries = make(map[discover.NodeID]*TestRegistry) - //not needed for this test but required from common_test for NewStreamService - waitPeerErrC = make(chan error) - //also not needed for this test but required for NewStreamService - peerCount = func(id discover.NodeID) int { - if ids[0] == id || ids[len(ids)-1] == id { - return 1 - } - return 2 - } - if *useMockStore { - createGlobalStore() - } + addrToIDMap map[string]discover.NodeID } //This test is a syncing test for nodes. @@ -116,12 +57,12 @@ func initSyncTest() { //to the pivot node, and we check that nodes get the chunks //they are expected to store based on the syncing protocol. //Number of chunks and nodes can be provided via commandline too. -func TestSyncing(t *testing.T) { +func TestSyncingViaGlobalSync(t *testing.T) { //if nodes/chunks have been provided via commandline, //run the tests with these values if *nodes != 0 && *chunks != 0 { log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) - testSyncing(t, *chunks, *nodes) + testSyncingViaGlobalSync(t, *chunks, *nodes) } else { var nodeCnt []int var chnkCnt []int @@ -138,51 +79,194 @@ func TestSyncing(t *testing.T) { for _, chnk := range chnkCnt { for _, n := range nodeCnt { log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) - testSyncing(t, chnk, n) + testSyncingViaGlobalSync(t, chnk, n) } } } } -//Do run the tests -//Every test runs 3 times, a live, a history, and a live AND history -func testSyncing(t *testing.T, chunkCount int, nodeCount int) { - //test live and NO history - log.Info("Testing live and no history") - live = true - history = false - err := runSyncTest(chunkCount, nodeCount, live, history) +func TestSyncingViaDirectSubscribe(t *testing.T) { + //if nodes/chunks have been provided via commandline, + //run the tests with these values + if *nodes != 0 && *chunks != 0 { + log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) + err := testSyncingViaDirectSubscribe(*chunks, *nodes) + if err != nil { + t.Fatal(err) + } + } else { + var nodeCnt []int + var chnkCnt []int + //if the `longrunning` flag has been provided + //run more test combinations + if *longrunning { + chnkCnt = []int{1, 8, 32, 256, 1024} + nodeCnt = []int{32, 16} + } else { + //default test + chnkCnt = []int{4, 32} + nodeCnt = []int{32, 16} + } + for _, chnk := range chnkCnt { + for _, n := range nodeCnt { + log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n)) + err := testSyncingViaDirectSubscribe(chnk, n) + if err != nil { + t.Fatal(err) + } + } + } + } +} + +func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + DoSync: true, + SyncUpdateDelay: 3 * time.Second, + }) + bucket.Store(bucketKeyRegistry, r) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + log.Info("Initializing test config") + + conf := &synctestConfig{} + //map of discover ID to indexes of chunks expected at that ID + conf.idToChunksMap = make(map[discover.NodeID][]int) + //map of overlay address to discover ID + conf.addrToIDMap = make(map[string]discover.NodeID) + //array where the generated chunk hashes will be stored + conf.hashes = make([]storage.Address, 0) + + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) if err != nil { t.Fatal(err) } - //test history only - log.Info("Testing history only") - live = false - history = true - err = runSyncTest(chunkCount, nodeCount, live, history) - if err != nil { - t.Fatal(err) - } - //finally test live and history - log.Info("Testing live and history") - live = true - err = runSyncTest(chunkCount, nodeCount, live, history) - if err != nil { - t.Fatal(err) + + ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelSimRun() + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + for _, n := range nodeIDs { + //get the kademlia overlay address from this ID + a := network.ToOverlayAddr(n.Bytes()) + //append it to the array of all overlay addresses + conf.addrs = append(conf.addrs, a) + //the proximity calculation is on overlay addr, + //the p2p/simulations check func triggers on discover.NodeID, + //so we need to know which overlay addr maps to which nodeID + conf.addrToIDMap[string(a)] = n + } + + //get the the node at that index + //this is the node selected for upload + node := sim.RandomUpNode() + item, ok := sim.NodeItem(node.ID, bucketKeyStore) + if !ok { + return fmt.Errorf("No localstore") + } + lstore := item.(*storage.LocalStore) + hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore) + if err != nil { + return err + } + conf.hashes = append(conf.hashes, hashes...) + mapKeysToNodes(conf) + + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + allSuccess := false + var gDir string + var globalStore *mockdb.GlobalStore + if *useMockStore { + gDir, globalStore, err = createGlobalStore() + if err != nil { + return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") + } + defer func() { + os.RemoveAll(gDir) + err := globalStore.Close() + if err != nil { + log.Error("Error closing global store! %v", "err", err) + } + }() + } + for !allSuccess { + for _, id := range nodeIDs { + //for each expected chunk, check if it is in the local store + localChunks := conf.idToChunksMap[id] + localSuccess := true + for _, ch := range localChunks { + //get the real chunk by the index in the index array + chunk := conf.hashes[ch] + log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) + //check if the expected chunk is indeed in the localstore + var err error + if *useMockStore { + //use the globalStore if the mockStore should be used; in that case, + //the complete localStore stack is bypassed for getting the chunk + _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk) + } else { + //use the actual localstore + item, ok := sim.NodeItem(id, bucketKeyStore) + if !ok { + return fmt.Errorf("Error accessing localstore") + } + lstore := item.(*storage.LocalStore) + _, err = lstore.Get(ctx, chunk) + } + if err != nil { + log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) + localSuccess = false + } else { + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + } + } + allSuccess = localSuccess + } + } + if !allSuccess { + return fmt.Errorf("Not all chunks succeeded!") + } + return nil + }) + + if result.Error != nil { + t.Fatal(result.Error) } } /* The test generates the given number of chunks -The upload is done by dependency to the global -`live` and `history` variables; - -If `live` is set, first stream subscriptions are established, then -upload to a random node. - -If `history` is enabled, first upload then build up subscriptions. - For every chunk generated, the nearest node addresses are identified, we verify that the nodes closer to the chunk addresses actually do have the chunks in their local stores. @@ -190,178 +274,84 @@ chunk addresses actually do have the chunks in their local stores. The test loads a snapshot file to construct the swarm network, assuming that the snapshot file identifies a healthy kademlia network. The snapshot should have 'streamer' in its service list. - -For every test run, a series of three tests will be executed: -- a LIVE test first, where first subscriptions are established, - then a file (random chunks) is uploaded -- a HISTORY test, where the file is uploaded first, and then - the subscriptions are established -- a crude LIVE AND HISTORY test last, where (different) chunks - are uploaded twice, once before and once after subscriptions */ -func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error { - initSyncTest() - //the ids of the snapshot nodes, initiate only now as we need nodeCount - ids = make([]discover.NodeID, nodeCount) - //initialize the test struct - conf = &synctestConfig{} +func testSyncingViaDirectSubscribe(chunkCount int, nodeCount int) error { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + store, datadir, err := createTestLocalStorageForID(id, addr) + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + os.RemoveAll(datadir) + store.Close() + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), nil) + bucket.Store(bucketKeyRegistry, r) + + fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() + + ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelSimRun() + + conf := &synctestConfig{} //map of discover ID to indexes of chunks expected at that ID conf.idToChunksMap = make(map[discover.NodeID][]int) //map of overlay address to discover ID - conf.addrToIdMap = make(map[string]discover.NodeID) + conf.addrToIDMap = make(map[string]discover.NodeID) //array where the generated chunk hashes will be stored conf.hashes = make([]storage.Address, 0) - //channel to trigger node checks in the simulation - trigger := make(chan discover.NodeID) - //channel to check for disconnection errors - disconnectC := make(chan error) - //channel to close disconnection watcher routine - quitC := make(chan struct{}) - //load nodes from the snapshot file - net, err := initNetWithSnapshot(nodeCount) + err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) if err != nil { return err } - var rpcSubscriptionsWg sync.WaitGroup - //do cleanup after test is terminated - defer func() { - // close quitC channel to signall all goroutines to clanup - // before calling simulation network shutdown. - close(quitC) - //wait for all rpc subscriptions to unsubscribe - rpcSubscriptionsWg.Wait() - //shutdown the snapshot network - net.Shutdown() - //after the test, clean up local stores initialized with createLocalStoreForId - localStoreCleanup() - //finally clear all data directories - datadirsCleanup() - }() - //get the nodes of the network - nodes := net.GetNodes() - //select one index at random... - idx := rand.Intn(len(nodes)) - //...and get the the node at that index - //this is the node selected for upload - node := nodes[idx] - log.Info("Initializing test config") - //iterate over all nodes... - for c := 0; c < len(nodes); c++ { - //create an array of discovery node IDs - ids[c] = nodes[c].ID() - //get the kademlia overlay address from this ID - a := network.ToOverlayAddr(ids[c].Bytes()) - //append it to the array of all overlay addresses - conf.addrs = append(conf.addrs, a) - //the proximity calculation is on overlay addr, - //the p2p/simulations check func triggers on discover.NodeID, - //so we need to know which overlay addr maps to which nodeID - conf.addrToIdMap[string(a)] = ids[c] - } - log.Info("Test config successfully initialized") - - //only needed for healthy call when debugging - ppmap = network.NewPeerPotMap(testMinProxBinSize, conf.addrs) - - //define the action to be performed before the test checks: start syncing - action := func(ctx context.Context) error { - //first run the health check on all nodes, - //wait until nodes are all healthy - ticker := time.NewTicker(200 * time.Millisecond) - defer ticker.Stop() - for range ticker.C { - healthy := true - for _, id := range ids { - r := registries[id] - //PeerPot for this node - addr := common.Bytes2Hex(network.ToOverlayAddr(id.Bytes())) - pp := ppmap[addr] - //call Healthy RPC - h := r.delivery.overlay.Healthy(pp) - //print info - log.Debug(r.delivery.overlay.String()) - log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full)) - if !h.GotNN || !h.Full { - healthy = false - break - } - } - if healthy { - break - } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + for _, n := range nodeIDs { + //get the kademlia overlay address from this ID + a := network.ToOverlayAddr(n.Bytes()) + //append it to the array of all overlay addresses + conf.addrs = append(conf.addrs, a) + //the proximity calculation is on overlay addr, + //the p2p/simulations check func triggers on discover.NodeID, + //so we need to know which overlay addr maps to which nodeID + conf.addrToIDMap[string(a)] = n } - if history { - log.Info("Uploading for history") - //If testing only history, we upload the chunk(s) first - chunks, err := uploadFileToSingleNodeStore(node.ID(), chunkCount) - if err != nil { - return err - } - conf.hashes = append(conf.hashes, chunks...) - //finally map chunks to the closest addresses - mapKeysToNodes(conf) - } + var subscriptionCount int - //variables needed to wait for all subscriptions established before uploading - errc := make(chan error) + filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4) + eventC := sim.PeerEvents(ctx, nodeIDs, filter) - //now setup and start event watching in order to know when we can upload - ctx, watchCancel := context.WithTimeout(context.Background(), MaxTimeout*time.Second) - defer watchCancel() - - log.Info("Setting up stream subscription") - - //We need two iterations, one to subscribe to the subscription events - //(so we know when setup phase is finished), and one to - //actually run the stream subscriptions. We can't do it in the same iteration, - //because while the first nodes in the loop are setting up subscriptions, - //the latter ones have not subscribed to listen to peer events yet, - //and then we miss events. - - //first iteration: setup disconnection watcher and subscribe to peer events - for j, id := range ids { - log.Trace(fmt.Sprintf("Subscribe to subscription events: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } - - wsDoneC := watchSubscriptionEvents(ctx, id, client, errc, quitC) - // doneC is nil, the error happened which is sent to errc channel, already - if wsDoneC == nil { - continue - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wsDoneC - rpcSubscriptionsWg.Done() - }() - - //watch for peers disconnecting - wdDoneC, err := streamTesting.WatchDisconnections(id, client, disconnectC, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-wdDoneC - rpcSubscriptionsWg.Done() - }() - } - - //second iteration: start syncing - for j, id := range ids { + for j, node := range nodeIDs { log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j)) - client, err := net.GetNode(id).Client() - if err != nil { - return err - } //start syncing! + item, ok := sim.NodeItem(node, bucketKeyRegistry) + if !ok { + return fmt.Errorf("No registry") + } + registry := item.(*Registry) + var cnt int - err = client.CallContext(ctx, &cnt, "stream_startSyncing") + cnt, err = startSyncing(registry, conf) if err != nil { return err } @@ -370,117 +360,89 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error { subscriptionCount += cnt } - //now wait until the number of expected subscriptions has been finished - //`watchSubscriptionEvents` will write with a `nil` value to errc - for err := range errc { - if err != nil { - return err + for e := range eventC { + if e.Error != nil { + return e.Error } - //`nil` received, decrement count subscriptionCount-- - //all subscriptions received if subscriptionCount == 0 { break } } + //select a random node for upload + node := sim.RandomUpNode() + item, ok := sim.NodeItem(node.ID, bucketKeyStore) + if !ok { + return fmt.Errorf("No localstore") + } + lstore := item.(*storage.LocalStore) + hashes, err := uploadFileToSingleNodeStore(node.ID, chunkCount, lstore) + if err != nil { + return err + } + conf.hashes = append(conf.hashes, hashes...) + mapKeysToNodes(conf) - log.Info("Stream subscriptions successfully requested") - if live { - //now upload the chunks to the selected random single node - hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount) + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + var gDir string + var globalStore *mockdb.GlobalStore + if *useMockStore { + gDir, globalStore, err = createGlobalStore() if err != nil { - return err + return fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") } - conf.hashes = append(conf.hashes, hashes...) - //finally map chunks to the closest addresses - log.Debug(fmt.Sprintf("Uploaded chunks for live syncing: %v", conf.hashes)) - mapKeysToNodes(conf) - log.Info(fmt.Sprintf("Uploaded %d chunks to random single node", chunkCount)) + defer os.RemoveAll(gDir) } - - log.Info("Action terminated") - - return nil - } - - //check defines what will be checked during the test - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - case e := <-disconnectC: - log.Error(e.Error()) - return false, fmt.Errorf("Disconnect event detected, network unhealthy") - default: - } - log.Trace(fmt.Sprintf("Checking node: %s", id)) - //select the local store for the given node - //if there are more than one chunk, test only succeeds if all expected chunks are found - allSuccess := true - - //all the chunk indexes which are supposed to be found for this node - localChunks := conf.idToChunksMap[id] - //for each expected chunk, check if it is in the local store - for _, ch := range localChunks { - //get the real chunk by the index in the index array - chunk := conf.hashes[ch] - log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) - //check if the expected chunk is indeed in the localstore - var err error - if *useMockStore { - if globalStore == nil { - return false, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") + // File retrieval check is repeated until all uploaded files are retrieved from all nodes + // or until the timeout is reached. + allSuccess := false + for !allSuccess { + for _, id := range nodeIDs { + //for each expected chunk, check if it is in the local store + localChunks := conf.idToChunksMap[id] + localSuccess := true + for _, ch := range localChunks { + //get the real chunk by the index in the index array + chunk := conf.hashes[ch] + log.Trace(fmt.Sprintf("node has chunk: %s:", chunk)) + //check if the expected chunk is indeed in the localstore + var err error + if *useMockStore { + //use the globalStore if the mockStore should be used; in that case, + //the complete localStore stack is bypassed for getting the chunk + _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk) + } else { + //use the actual localstore + item, ok := sim.NodeItem(id, bucketKeyStore) + if !ok { + return fmt.Errorf("Error accessing localstore") + } + lstore := item.(*storage.LocalStore) + _, err = lstore.Get(ctx, chunk) + } + if err != nil { + log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) + localSuccess = false + } else { + log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + } } - //use the globalStore if the mockStore should be used; in that case, - //the complete localStore stack is bypassed for getting the chunk - _, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk) - } else { - //use the actual localstore - lstore := stores[id] - _, err = lstore.Get(context.TODO(), chunk) - } - if err != nil { - log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id)) - allSuccess = false - } else { - log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id)) + allSuccess = localSuccess } } - - return allSuccess, nil - } - - //for each tick, run the checks on all nodes - timingTicker := time.NewTicker(time.Second * 1) - defer timingTicker.Stop() - go func() { - for range timingTicker.C { - for i := 0; i < len(ids); i++ { - log.Trace(fmt.Sprintf("triggering step %d, id %s", i, ids[i])) - trigger <- ids[i] - } + if !allSuccess { + return fmt.Errorf("Not all chunks succeeded!") } - }() - - log.Info("Starting simulation run...") - - timeout := MaxTimeout * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - //run the simulation - result := simulations.NewSimulation(net).Run(ctx, &simulations.Step{ - Action: action, - Trigger: trigger, - Expect: &simulations.Expectation{ - Nodes: ids, - Check: check, - }, + return nil }) if result.Error != nil { return result.Error } + log.Info("Simulation terminated") return nil } @@ -489,20 +451,9 @@ func runSyncTest(chunkCount int, nodeCount int, live bool, history bool) error { //issues `RequestSubscriptionMsg` to peers, based on po, by iterating over //the kademlia's `EachBin` function. //returns the number of subscriptions requested -func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) { +func startSyncing(r *Registry, conf *synctestConfig) (int, error) { var err error - if log.Lvl(*loglevel) == log.LvlDebug { - //PeerPot for this node - addr := common.Bytes2Hex(r.addr.OAddr) - pp := ppmap[addr] - //call Healthy RPC - h := r.delivery.overlay.Healthy(pp) - //print info - log.Debug(r.delivery.overlay.String()) - log.Debug(fmt.Sprintf("IS HEALTHY: %t", h.GotNN && h.KnowNN && h.Full)) - } - kad, ok := r.delivery.overlay.(*network.Kademlia) if !ok { return 0, fmt.Errorf("Not a Kademlia!") @@ -512,14 +463,10 @@ func (r *TestRegistry) StartSyncing(ctx context.Context) (int, error) { //iterate over each bin and solicit needed subscription to bins kad.EachBin(r.addr.Over(), pof, 0, func(conn network.OverlayConn, po int) bool { //identify begin and start index of the bin(s) we want to subscribe to - log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr.ID(), conf.addrToIdMap[string(conn.Address())], po)) - var histRange *Range - if history { - histRange = &Range{} - } + histRange := &Range{} subCnt++ - err = r.RequestSubscription(conf.addrToIdMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), live), histRange, Top) + err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), histRange, Top) if err != nil { log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err)) return false @@ -552,7 +499,7 @@ func mapKeysToNodes(conf *synctestConfig) { return false } if pl == 256 || pl == po { - log.Trace(fmt.Sprintf("appending %s", conf.addrToIdMap[string(a)])) + log.Trace(fmt.Sprintf("appending %s", conf.addrToIDMap[string(a)])) nns = append(nns, indexmap[string(a)]) nodemap[string(a)] = append(nodemap[string(a)], i) } @@ -567,26 +514,24 @@ func mapKeysToNodes(conf *synctestConfig) { } for addr, chunks := range nodemap { //this selects which chunks are expected to be found with the given node - conf.idToChunksMap[conf.addrToIdMap[addr]] = chunks + conf.idToChunksMap[conf.addrToIDMap[addr]] = chunks } log.Debug(fmt.Sprintf("Map of expected chunks by ID: %v", conf.idToChunksMap)) conf.chunksToNodesMap = kmap } //upload a file(chunks) to a single local node store -func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage.Address, error) { +func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int, lstore *storage.LocalStore) ([]storage.Address, error) { log.Debug(fmt.Sprintf("Uploading to node id: %s", id)) - lstore := stores[id] - size := chunkSize fileStore := storage.NewFileStore(lstore, storage.NewFileStoreParams()) + size := chunkSize var rootAddrs []storage.Address for i := 0; i < chunkCount; i++ { - ctx := context.TODO() - rk, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + rk, wait, err := fileStore.Store(context.TODO(), io.LimitReader(crand.Reader, int64(size)), int64(size), false) if err != nil { return nil, err } - err = wait(ctx) + err = wait(context.TODO()) if err != nil { return nil, err } @@ -595,129 +540,3 @@ func uploadFileToSingleNodeStore(id discover.NodeID, chunkCount int) ([]storage. return rootAddrs, nil } - -//initialize a network from a snapshot -func initNetWithSnapshot(nodeCount int) (*simulations.Network, error) { - - var a adapters.NodeAdapter - //add the streamer service to the node adapter - - if *adapter == "exec" { - dirname, err := ioutil.TempDir(".", "") - if err != nil { - return nil, err - } - a = adapters.NewExecAdapter(dirname) - } else if *adapter == "tcp" { - a = adapters.NewTCPAdapter(services) - } else if *adapter == "sim" { - a = adapters.NewSimAdapter(services) - } - - log.Info("Setting up Snapshot network") - - net := simulations.NewNetwork(a, &simulations.NetworkConfig{ - ID: "0", - DefaultService: "streamer", - }) - - f, err := os.Open(fmt.Sprintf("testing/snapshot_%d.json", nodeCount)) - if err != nil { - return nil, err - } - defer f.Close() - jsonbyte, err := ioutil.ReadAll(f) - if err != nil { - return nil, err - } - var snap simulations.Snapshot - err = json.Unmarshal(jsonbyte, &snap) - if err != nil { - return nil, err - } - - //the snapshot probably has the property EnableMsgEvents not set - //just in case, set it to true! - //(we need this to wait for messages before uploading) - for _, n := range snap.Nodes { - n.Node.Config.EnableMsgEvents = true - } - - log.Info("Waiting for p2p connections to be established...") - - //now we can load the snapshot - err = net.Load(&snap) - if err != nil { - return nil, err - } - log.Info("Snapshot loaded") - return net, nil -} - -//we want to wait for subscriptions to be established before uploading to test -//that live syncing is working correctly -func watchSubscriptionEvents(ctx context.Context, id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}) { - events := make(chan *p2p.PeerEvent) - sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") - if err != nil { - log.Error(err.Error()) - errc <- fmt.Errorf("error getting peer events for node %v: %s", id, err) - return - } - c := make(chan struct{}) - - go func() { - defer func() { - log.Trace("watch subscription events: unsubscribe", "id", id) - sub.Unsubscribe() - close(c) - }() - - for { - select { - case <-quitC: - return - case <-ctx.Done(): - select { - case errc <- ctx.Err(): - case <-quitC: - } - return - case e := <-events: - //just catch SubscribeMsg - if e.Type == p2p.PeerEventTypeMsgRecv && e.Protocol == "stream" && e.MsgCode != nil && *e.MsgCode == 4 { - errc <- nil - } - case err := <-sub.Err(): - if err != nil { - select { - case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err): - case <-quitC: - } - return - } - } - } - }() - return c -} - -//create a local store for the given node -func createTestLocalStorageForId(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) { - var datadir string - var err error - datadir, err = ioutil.TempDir("", fmt.Sprintf("syncer-test-%s", id.TerminalString())) - if err != nil { - return nil, err - } - datadirs[id] = datadir - var store storage.ChunkStore - params := storage.NewDefaultLocalStoreParams() - params.ChunkDbPath = datadir - params.BaseKey = addr.Over() - store, err = storage.NewTestLocalStoreForAddr(params) - if err != nil { - return nil, err - } - return store, nil -} diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a3d53e648..f72aa3444 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -23,18 +23,22 @@ import ( "io" "io/ioutil" "math" + "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" - "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" - streamTesting "github.com/ethereum/go-ethereum/swarm/network/stream/testing" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + mockdb "github.com/ethereum/go-ethereum/swarm/storage/mock/db" ) const dataChunkCount = 200 @@ -46,222 +50,193 @@ func TestSyncerSimulation(t *testing.T) { testSyncBetweenNodes(t, 16, 1, dataChunkCount, true, 1) } -func createMockStore(id discover.NodeID, addr *network.BzzAddr) (storage.ChunkStore, error) { - var err error +func createMockStore(globalStore *mockdb.GlobalStore, id discover.NodeID, addr *network.BzzAddr) (lstore storage.ChunkStore, datadir string, err error) { address := common.BytesToAddress(id.Bytes()) mockStore := globalStore.NewNodeStore(address) params := storage.NewDefaultLocalStoreParams() - datadirs[id], err = ioutil.TempDir("", "localMockStore-"+id.TerminalString()) + + datadir, err = ioutil.TempDir("", "localMockStore-"+id.TerminalString()) if err != nil { - return nil, err + return nil, "", err } - params.Init(datadirs[id]) + params.Init(datadir) params.BaseKey = addr.Over() - lstore, err := storage.NewLocalStore(params, mockStore) - return lstore, nil + lstore, err = storage.NewLocalStore(params, mockStore) + return lstore, datadir, nil } func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bool, po uint8) { - defer setDefaultSkipCheck(defaultSkipCheck) - defaultSkipCheck = skipCheck - //data directories for each node and store - datadirs = make(map[discover.NodeID]string) - if *useMockStore { - createStoreFunc = createMockStore - createGlobalStore() - } else { - createStoreFunc = createTestLocalStorageFromSim - } - defer datadirsCleanup() + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + var store storage.ChunkStore + var globalStore *mockdb.GlobalStore + var gDir, datadir string + + id := ctx.Config.ID + addr := network.NewAddrFromNodeID(id) + //hack to put addresses in same space + addr.OAddr[0] = byte(0) + + if *useMockStore { + gDir, globalStore, err = createGlobalStore() + if err != nil { + return nil, nil, fmt.Errorf("Something went wrong; using mockStore enabled but globalStore is nil") + } + store, datadir, err = createMockStore(globalStore, id, addr) + } else { + store, datadir, err = createTestLocalStorageForID(id, addr) + } + if err != nil { + return nil, nil, err + } + bucket.Store(bucketKeyStore, store) + cleanup = func() { + store.Close() + os.RemoveAll(datadir) + if *useMockStore { + err := globalStore.Close() + if err != nil { + log.Error("Error closing global store! %v", "err", err) + } + os.RemoveAll(gDir) + } + } + localStore := store.(*storage.LocalStore) + db := storage.NewDBAPI(localStore) + bucket.Store(bucketKeyDB, db) + kad := network.NewKademlia(addr.Over(), network.NewKadParams()) + delivery := NewDelivery(kad, db) + bucket.Store(bucketKeyDelivery, delivery) + + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + }) + + fileStore := storage.NewFileStore(storage.NewNetStore(localStore, nil), storage.NewFileStoreParams()) + bucket.Store(bucketKeyFileStore, fileStore) + + return r, cleanup, nil + + }, + }) + defer sim.Close() - registries = make(map[discover.NodeID]*TestRegistry) - toAddr = func(id discover.NodeID) *network.BzzAddr { - addr := network.NewAddrFromNodeID(id) - //hack to put addresses in same space - addr.OAddr[0] = byte(0) - return addr - } - conf := &streamTesting.RunConfig{ - Adapter: *adapter, - NodeCount: nodes, - ConnLevel: conns, - ToAddr: toAddr, - Services: services, - EnableMsgEvents: false, - } - // HACK: these are global variables in the test so that they are available for - // the service constructor function - // TODO: will this work with exec/docker adapter? - // localstore of nodes made available for action and check calls - stores = make(map[discover.NodeID]storage.ChunkStore) - deliveries = make(map[discover.NodeID]*Delivery) // create context for simulation run timeout := 30 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) // defer cancel should come before defer simulation teardown defer cancel() - // create simulation network with the config - sim, teardown, err := streamTesting.NewSimulation(conf) - var rpcSubscriptionsWg sync.WaitGroup - defer func() { - rpcSubscriptionsWg.Wait() - teardown() - }() + _, err := sim.AddNodesAndConnectChain(nodes) if err != nil { - t.Fatal(err.Error()) + t.Fatal(err) } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() - nodeIndex := make(map[discover.NodeID]int) - for i, id := range sim.IDs { - nodeIndex[id] = i - if !*useMockStore { - stores[id] = sim.Stores[i] - sim.Stores[i] = stores[id] + nodeIndex := make(map[discover.NodeID]int) + for i, id := range nodeIDs { + nodeIndex[id] = i } - } - // peerCount function gives the number of peer connections for a nodeID - // this is needed for the service run function to wait until - // each protocol instance runs and the streamer peers are available - peerCount = func(id discover.NodeID) int { - if sim.IDs[0] == id || sim.IDs[nodes-1] == id { - return 1 - } - return 2 - } - waitPeerErrC = make(chan error) - // create DBAPI-s for all nodes - dbs := make([]*storage.DBAPI, nodes) - for i := 0; i < nodes; i++ { - dbs[i] = storage.NewDBAPI(sim.Stores[i].(*storage.LocalStore)) - } + disconnections := sim.PeerEvents( + context.Background(), + sim.NodeIDs(), + simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + ) - // collect hashes in po 1 bin for each node - hashes := make([][]storage.Address, nodes) - totalHashes := 0 - hashCounts := make([]int, nodes) - for i := nodes - 1; i >= 0; i-- { - if i < nodes-1 { - hashCounts[i] = hashCounts[i+1] - } - dbs[i].Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { - hashes[i] = append(hashes[i], addr) - totalHashes++ - hashCounts[i]++ - return true - }) - } - - // errc is error channel for simulation - errc := make(chan error, 1) - quitC := make(chan struct{}) - defer close(quitC) - - // action is subscribe - action := func(ctx context.Context) error { - // need to wait till an aynchronous process registers the peers in streamer.peers - // that is used by Subscribe - // the global peerCount function tells how many connections each node has - // TODO: this is to be reimplemented with peerEvent watcher without global var - i := 0 - for err := range waitPeerErrC { - if err != nil { - return fmt.Errorf("error waiting for peers: %s", err) + go func() { + for d := range disconnections { + if d.Error != nil { + log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + t.Fatal(d.Error) + } } - i++ - if i == nodes { - break - } - } + }() + // each node Subscribes to each other's swarmChunkServerStreamName for j := 0; j < nodes-1; j++ { - id := sim.IDs[j] - sim.Stores[j] = stores[id] - err := sim.CallClient(id, func(client *rpc.Client) error { - // report disconnect events to the error channel cos peers should not disconnect - doneC, err := streamTesting.WatchDisconnections(id, client, errc, quitC) - if err != nil { - return err - } - rpcSubscriptionsWg.Add(1) - go func() { - <-doneC - rpcSubscriptionsWg.Done() - }() - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - // start syncing, i.e., subscribe to upstream peers po 1 bin - sid := sim.IDs[j+1] - return client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) - }) + id := nodeIDs[j] + client, err := sim.Net.GetNode(id).Client() + if err != nil { + t.Fatal(err) + } + sid := nodeIDs[j+1] + client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) if err != nil { return err } - } - // here we distribute chunks of a random file into stores 1...nodes - rrFileStore := storage.NewFileStore(newRoundRobinStore(sim.Stores[1:]...), storage.NewFileStoreParams()) - size := chunkCount * chunkSize - _, wait, err := rrFileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) - if err != nil { - t.Fatal(err.Error()) - } - // need to wait cos we then immediately collect the relevant bin content - wait(ctx) - if err != nil { - t.Fatal(err.Error()) - } - - return nil - } - - // this makes sure check is not called before the previous call finishes - check := func(ctx context.Context, id discover.NodeID) (bool, error) { - select { - case err := <-errc: - return false, err - case <-ctx.Done(): - return false, ctx.Err() - default: - } - - i := nodeIndex[id] - var total, found int - - for j := i; j < nodes; j++ { - total += len(hashes[j]) - for _, key := range hashes[j] { - chunk, err := dbs[i].Get(ctx, key) - if err == storage.ErrFetching { - <-chunk.ReqC - } else if err != nil { - continue + if j > 0 || nodes == 2 { + item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") } - // needed for leveldb not to be closed? - // chunk.WaitToStore() - found++ + fileStore := item.(*storage.FileStore) + size := chunkCount * chunkSize + _, wait, err := fileStore.Store(ctx, io.LimitReader(crand.Reader, int64(size)), int64(size), false) + if err != nil { + t.Fatal(err.Error()) + } + wait(ctx) } } - log.Debug("sync check", "node", id, "index", i, "bin", po, "found", found, "total", total) - return total == found, nil - } + // here we distribute chunks of a random file into stores 1...nodes + if _, err := sim.WaitTillHealthy(ctx, 2); err != nil { + return err + } + + // collect hashes in po 1 bin for each node + hashes := make([][]storage.Address, nodes) + totalHashes := 0 + hashCounts := make([]int, nodes) + for i := nodes - 1; i >= 0; i-- { + if i < nodes-1 { + hashCounts[i] = hashCounts[i+1] + } + item, ok := sim.NodeItem(nodeIDs[i], bucketKeyDB) + if !ok { + return fmt.Errorf("No DB") + } + db := item.(*storage.DBAPI) + db.Iterator(0, math.MaxUint64, po, func(addr storage.Address, index uint64) bool { + hashes[i] = append(hashes[i], addr) + totalHashes++ + hashCounts[i]++ + return true + }) + } + var total, found int + for _, node := range nodeIDs { + i := nodeIndex[node] + + for j := i; j < nodes; j++ { + total += len(hashes[j]) + for _, key := range hashes[j] { + item, ok := sim.NodeItem(nodeIDs[j], bucketKeyDB) + if !ok { + return fmt.Errorf("No DB") + } + db := item.(*storage.DBAPI) + chunk, err := db.Get(ctx, key) + if err == storage.ErrFetching { + <-chunk.ReqC + } else if err != nil { + continue + } + // needed for leveldb not to be closed? + // chunk.WaitToStore() + found++ + } + } + log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) + } + if total == found && total > 0 { + return nil + } + return fmt.Errorf("Total not equallying found: total is %d", total) + }) - conf.Step = &simulations.Step{ - Action: action, - Trigger: streamTesting.Trigger(500*time.Millisecond, quitC, sim.IDs[0:nodes-1]...), - Expect: &simulations.Expectation{ - Nodes: sim.IDs[0:1], - Check: check, - }, - } - startedAt := time.Now() - result, err := sim.Run(ctx, conf) - finishedAt := time.Now() - if err != nil { - t.Fatalf("Setting up simulation failed: %v", err) - } if result.Error != nil { - t.Fatalf("Simulation failed: %s", result.Error) + t.Fatal(result.Error) } - streamTesting.CheckResult(t, result, startedAt, finishedAt) } diff --git a/swarm/network/stream/testing/testing.go b/swarm/network/stream/testing/testing.go deleted file mode 100644 index d584ec397..000000000 --- a/swarm/network/stream/testing/testing.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package testing - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "math/rand" - "os" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/p2p/simulations" - "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/log" - "github.com/ethereum/go-ethereum/swarm/network" - "github.com/ethereum/go-ethereum/swarm/storage" -) - -type Simulation struct { - Net *simulations.Network - Stores []storage.ChunkStore - Addrs []network.Addr - IDs []discover.NodeID -} - -func SetStores(addrs ...network.Addr) ([]storage.ChunkStore, func(), error) { - var datadirs []string - stores := make([]storage.ChunkStore, len(addrs)) - var err error - for i, addr := range addrs { - var datadir string - datadir, err = ioutil.TempDir("", "streamer") - if err != nil { - break - } - var store storage.ChunkStore - params := storage.NewDefaultLocalStoreParams() - params.Init(datadir) - params.BaseKey = addr.Over() - store, err = storage.NewTestLocalStoreForAddr(params) - if err != nil { - break - } - datadirs = append(datadirs, datadir) - stores[i] = store - } - teardown := func() { - for i, datadir := range datadirs { - stores[i].Close() - os.RemoveAll(datadir) - } - } - return stores, teardown, err -} - -func NewAdapter(adapterType string, services adapters.Services) (adapter adapters.NodeAdapter, teardown func(), err error) { - teardown = func() {} - switch adapterType { - case "sim": - adapter = adapters.NewSimAdapter(services) - case "exec": - baseDir, err0 := ioutil.TempDir("", "swarm-test") - if err0 != nil { - return nil, teardown, err0 - } - teardown = func() { os.RemoveAll(baseDir) } - adapter = adapters.NewExecAdapter(baseDir) - case "docker": - adapter, err = adapters.NewDockerAdapter() - if err != nil { - return nil, teardown, err - } - default: - return nil, teardown, errors.New("adapter needs to be one of sim, exec, docker") - } - return adapter, teardown, nil -} - -func CheckResult(t *testing.T, result *simulations.StepResult, startedAt, finishedAt time.Time) { - t.Logf("Simulation passed in %s", result.FinishedAt.Sub(result.StartedAt)) - if len(result.Passes) > 1 { - var min, max time.Duration - var sum int - for _, pass := range result.Passes { - duration := pass.Sub(result.StartedAt) - if sum == 0 || duration < min { - min = duration - } - if duration > max { - max = duration - } - sum += int(duration.Nanoseconds()) - } - t.Logf("Min: %s, Max: %s, Average: %s", min, max, time.Duration(sum/len(result.Passes))*time.Nanosecond) - } - t.Logf("Setup: %s, Shutdown: %s", result.StartedAt.Sub(startedAt), finishedAt.Sub(result.FinishedAt)) -} - -type RunConfig struct { - Adapter string - Step *simulations.Step - NodeCount int - ConnLevel int - ToAddr func(discover.NodeID) *network.BzzAddr - Services adapters.Services - DefaultService string - EnableMsgEvents bool -} - -func NewSimulation(conf *RunConfig) (*Simulation, func(), error) { - // create network - nodes := conf.NodeCount - adapter, adapterTeardown, err := NewAdapter(conf.Adapter, conf.Services) - if err != nil { - return nil, adapterTeardown, err - } - defaultService := "streamer" - if conf.DefaultService != "" { - defaultService = conf.DefaultService - } - net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ - ID: "0", - DefaultService: defaultService, - }) - teardown := func() { - adapterTeardown() - net.Shutdown() - } - ids := make([]discover.NodeID, nodes) - addrs := make([]network.Addr, nodes) - // start nodes - for i := 0; i < nodes; i++ { - nodeconf := adapters.RandomNodeConfig() - nodeconf.EnableMsgEvents = conf.EnableMsgEvents - node, err := net.NewNodeWithConfig(nodeconf) - if err != nil { - return nil, teardown, fmt.Errorf("error creating node: %s", err) - } - ids[i] = node.ID() - addrs[i] = conf.ToAddr(ids[i]) - } - // set nodes number of Stores available - stores, storeTeardown, err := SetStores(addrs...) - teardown = func() { - net.Shutdown() - adapterTeardown() - storeTeardown() - } - if err != nil { - return nil, teardown, err - } - s := &Simulation{ - Net: net, - Stores: stores, - IDs: ids, - Addrs: addrs, - } - return s, teardown, nil -} - -func (s *Simulation) Run(ctx context.Context, conf *RunConfig) (*simulations.StepResult, error) { - // bring up nodes, launch the servive - nodes := conf.NodeCount - conns := conf.ConnLevel - for i := 0; i < nodes; i++ { - if err := s.Net.Start(s.IDs[i]); err != nil { - return nil, fmt.Errorf("error starting node %s: %s", s.IDs[i].TerminalString(), err) - } - } - // run a simulation which connects the 10 nodes in a chain - wg := sync.WaitGroup{} - for i := range s.IDs { - // collect the overlay addresses, to - for j := 0; j < conns; j++ { - var k int - if j == 0 { - k = i - 1 - } else { - k = rand.Intn(len(s.IDs)) - } - if i > 0 { - wg.Add(1) - go func(i, k int) { - defer wg.Done() - s.Net.Connect(s.IDs[i], s.IDs[k]) - }(i, k) - } - } - } - wg.Wait() - log.Info(fmt.Sprintf("simulation with %v nodes", len(s.Addrs))) - - // create an only locally retrieving FileStore for the pivot node to test - // if retriee requests have arrived - result := simulations.NewSimulation(s.Net).Run(ctx, conf.Step) - return result, nil -} - -// WatchDisconnections subscribes to admin peerEvents and sends peer event drop -// errors to the errc channel. Channel quitC signals the termination of the event loop. -// Returned doneC will be closed after the rpc subscription is unsubscribed, -// signaling that simulations network is safe to shutdown. -func WatchDisconnections(id discover.NodeID, client *rpc.Client, errc chan error, quitC chan struct{}) (doneC <-chan struct{}, err error) { - events := make(chan *p2p.PeerEvent) - sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") - if err != nil { - return nil, fmt.Errorf("error getting peer events for node %v: %s", id, err) - } - c := make(chan struct{}) - go func() { - defer func() { - log.Trace("watch disconnections: unsubscribe", "id", id) - sub.Unsubscribe() - close(c) - }() - for { - select { - case <-quitC: - return - case e := <-events: - if e.Type == p2p.PeerEventTypeDrop { - select { - case errc <- fmt.Errorf("peerEvent for node %v: %v", id, e): - case <-quitC: - return - } - } - case err := <-sub.Err(): - if err != nil { - select { - case errc <- fmt.Errorf("error getting peer events for node %v: %v", id, err): - case <-quitC: - return - } - } - } - } - }() - return c, nil -} - -func Trigger(d time.Duration, quitC chan struct{}, ids ...discover.NodeID) chan discover.NodeID { - trigger := make(chan discover.NodeID) - go func() { - defer close(trigger) - ticker := time.NewTicker(d) - defer ticker.Stop() - // we are only testing the pivot node (net.Nodes[0]) - for range ticker.C { - for _, id := range ids { - select { - case trigger <- id: - case <-quitC: - return - } - } - } - }() - return trigger -} - -func (sim *Simulation) CallClient(id discover.NodeID, f func(*rpc.Client) error) error { - node := sim.Net.GetNode(id) - if node == nil { - return fmt.Errorf("unknown node: %s", id) - } - client, err := node.Client() - if err != nil { - return fmt.Errorf("error getting node client: %s", err) - } - return f(client) -}