diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 949645558..29b4f2f69 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -38,8 +38,13 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) +//Tests initializing a retrieve request func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) + regOpts := &RegistryOptions{ + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, + } + tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) defer teardown() if err != nil { t.Fatal(err) @@ -55,10 +60,21 @@ func TestStreamerRetrieveRequest(t *testing.T) { ) streamer.delivery.RequestFromPeers(ctx, req) + stream := NewStream(swarmChunkServerStreamName, "", true) + err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", Expects: []p2ptest.Expect{ - { + { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + { //expect a retrieve request message for the given hash Code: 5, Msg: &RetrieveRequestMsg{ Addr: hash0[:], @@ -74,9 +90,12 @@ func TestStreamerRetrieveRequest(t *testing.T) { } } +//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) +//Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, //do no syncing }) defer teardown() if err != nil { @@ -89,16 +108,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(node.ID()) + stream := NewStream(swarmChunkServerStreamName, "", true) + //simulate pre-subscription to RETRIEVE_REQUEST stream on peer peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: NewStream(swarmChunkServerStreamName, "", true), + Stream: stream, History: nil, Priority: Top, }) + //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { //first expect a subscription to the RETRIEVE_REQUEST stream + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ - { + { //then the actual RETRIEVE_REQUEST.... Code: 5, Msg: &RetrieveRequestMsg{ Addr: chunk.Address()[:], @@ -107,7 +141,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }, Expects: []p2ptest.Expect{ - { + { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: nil, @@ -120,7 +154,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }) - expectedError := `exchange #0 "RetrieveRequestMsg": timed out` + //should fail with a timeout as the peer we are requesting + //the chunk from does not have the chunk + expectedError := `exchange #1 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -130,7 +166,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -138,6 +175,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } node := tester.Nodes[0] + peer := streamer.getPeer(node.ID()) stream := NewStream(swarmChunkServerStreamName, "", true) @@ -156,6 +194,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -226,7 +276,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -241,6 +292,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { node := tester.Nodes[0] + //subscribe to custom stream stream := NewStream("foo", "", true) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { @@ -253,7 +305,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ - { + { //first expect subscription to the custom stream... Code: 4, Msg: &SubscribeMsg{ Stream: stream, @@ -267,7 +319,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { p2ptest.Exchange{ Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ - { + { //...then trigger a chunk delivery for the given chunk from peer in order for + //local node to get the chunk delivered Code: 6, Msg: &ChunkDeliveryMsg{ Addr: chunkKey, @@ -342,8 +395,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - DoServeRetrieve: true, + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, }) bucket.Store(bucketKeyRegistry, r) @@ -408,20 +462,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return err } - //each of the nodes (except pivot node) subscribes to the stream of the next node - for j, node := range nodeIDs[0 : nodes-1] { - sid := nodeIDs[j+1] - item, ok := sim.NodeItem(node, bucketKeyRegistry) - if !ok { - return fmt.Errorf("No registry") - } - registry := item.(*Registry) - err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - //get the pivot node's filestore item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) if !ok { @@ -530,7 +570,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, - DoSync: true, + Syncing: SyncingDisabled, + Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 3164193b3..0c95fabb7 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 0d3bc7f54..65cde2411 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -25,7 +25,8 @@ import ( // when it is serving Retrieve requests. func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { // requests are disabled func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index b81cfc0ca..ad1519341 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, - DoRetrieve: true, - DoServeRetrieve: true, }) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 8d89f28cb..2ddbed936 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }) @@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoServeRetrieve: true, - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0ac374def..695ff0c50 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -47,6 +47,31 @@ const ( HashSize = 32 ) +//Enumerate options for syncing and retrieval +type SyncingOption int +type RetrievalOption int + +//Syncing options +const ( + //Syncing disabled + SyncingDisabled SyncingOption = iota + //Register the client and the server but not subscribe + SyncingRegisterOnly + //Both client and server funcs are registered, subscribe sent automatically + SyncingAutoSubscribe +) + +const ( + //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) + RetrievalDisabled RetrievalOption = iota + //Only the client side of the retrieve request is registered. + //(light nodes do not serve retrieve requests) + //once the client is registered, subscription to retrieve request stream is always sent + RetrievalClientOnly + //Both client and server funcs are registered, subscribe sent automatically + RetrievalEnabled +) + // Registry registry for outgoing and incoming streamer constructors type Registry struct { addr enode.ID @@ -60,16 +85,15 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - doRetrieve bool + autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags. - DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true. - DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag. + Syncing SyncingOption //Defines syncing behavior + Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } @@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } + //check if retriaval has been disabled + retrieval := options.Retrieval != RetrievalDisabled + streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, @@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - doRetrieve: options.DoRetrieve, + autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - if options.DoServeRetrieve { + //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) + if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { return nil, errors.New("only live retrieval requests supported") @@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy }) } - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) + //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) + if options.Retrieval != RetrievalDisabled { + streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) + }) + } - if options.DoSync { + //If syncing is not disabled, the syncing functions are registered (both client and server) + if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } - if options.DoSync { + //if syncing is set to automatically subscribe to the syncing stream, start the subscription process + if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop // - the latest int value is delivered to the loop after the processing is done @@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.doRetrieve { + if r.autoRetrieval && !p.LightNode { err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index e7f79e7a1..16c74d3b3 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) defer teardown() diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f2be3bef9..113807b98 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -114,6 +114,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }) diff --git a/swarm/swarm.go b/swarm/swarm.go index 7214abbda..1fb5443fd 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil { return nil, err } + + syncing := stream.SyncingAutoSubscribe + if !config.SyncEnabled || config.LightNodeEnabled { + syncing = stream.SyncingDisabled + } + + retrieval := stream.RetrievalEnabled + if config.LightNodeEnabled { + retrieval = stream.RetrievalClientOnly + } + registryOptions := &stream.RegistryOptions{ SkipCheck: config.DeliverySkipCheck, - DoSync: config.SyncEnabled, - DoRetrieve: true, - DoServeRetrieve: true, + Syncing: syncing, + Retrieval: retrieval, SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, } - if config.LightNodeEnabled { - registryOptions.DoSync = false - registryOptions.DoRetrieve = false - } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage