diff --git a/cmd/swarm/config.go b/cmd/swarm/config.go index c2f885d25..9d6fe41a7 100644 --- a/cmd/swarm/config.go +++ b/cmd/swarm/config.go @@ -59,27 +59,28 @@ var ( //constants for environment variables const ( - SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR" - SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT" - SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR" - SWARM_ENV_PORT = "SWARM_PORT" - SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID" - SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE" - SWARM_ENV_SWAP_API = "SWARM_SWAP_API" - SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE" - SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY" - SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE" - SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK" - SWARM_ENV_ENS_API = "SWARM_ENS_API" - SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR" - SWARM_ENV_CORS = "SWARM_CORS" - SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES" - SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE" - SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH" - SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY" - SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY" - SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD" - GETH_ENV_DATADIR = "GETH_DATADIR" + SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR" + SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT" + SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR" + SWARM_ENV_PORT = "SWARM_PORT" + SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID" + SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE" + SWARM_ENV_SWAP_API = "SWARM_SWAP_API" + SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE" + SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY" + SWARM_ENV_MAX_STREAM_PEER_SERVERS = "SWARM_ENV_MAX_STREAM_PEER_SERVERS" + SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE" + SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK" + SWARM_ENV_ENS_API = "SWARM_ENS_API" + SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR" + SWARM_ENV_CORS = "SWARM_CORS" + SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES" + SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE" + SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH" + SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY" + SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY" + SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD" + GETH_ENV_DATADIR = "GETH_DATADIR" ) // These settings ensure that TOML keys use the same names as Go struct fields. @@ -207,6 +208,9 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con currentConfig.SyncUpdateDelay = d } + // any value including 0 is acceptable + currentConfig.MaxStreamPeerServers = ctx.GlobalInt(SwarmMaxStreamPeerServersFlag.Name) + if ctx.GlobalIsSet(SwarmLightNodeEnabled.Name) { currentConfig.LightNodeEnabled = true } @@ -308,6 +312,14 @@ func envVarsOverride(currentConfig *bzzapi.Config) (config *bzzapi.Config) { } } + if max := os.Getenv(SWARM_ENV_MAX_STREAM_PEER_SERVERS); max != "" { + m, err := strconv.Atoi(max) + if err != nil { + utils.Fatalf("invalid environment variable %s: %v", SWARM_ENV_MAX_STREAM_PEER_SERVERS, err) + } + currentConfig.MaxStreamPeerServers = m + } + if lne := os.Getenv(SWARM_ENV_LIGHT_NODE_ENABLE); lne != "" { if lightnode, err := strconv.ParseBool(lne); err != nil { currentConfig.LightNodeEnabled = lightnode diff --git a/cmd/swarm/main.go b/cmd/swarm/main.go index 4c9ce931e..71d707c2b 100644 --- a/cmd/swarm/main.go +++ b/cmd/swarm/main.go @@ -116,6 +116,12 @@ var ( Usage: "Duration for sync subscriptions update after no new peers are added (default 15s)", EnvVar: SWARM_ENV_SYNC_UPDATE_DELAY, } + SwarmMaxStreamPeerServersFlag = cli.IntFlag{ + Name: "max-stream-peer-servers", + Usage: "Limit of Stream peer servers, 0 denotes unlimited", + EnvVar: SWARM_ENV_MAX_STREAM_PEER_SERVERS, + Value: 10000, // A very large default value is possible as stream servers have very small memory footprint + } SwarmLightNodeEnabled = cli.BoolFlag{ Name: "lightnode", Usage: "Enable Swarm LightNode (default false)", @@ -542,6 +548,7 @@ pv(1) tool to get a progress bar: SwarmSwapAPIFlag, SwarmSyncDisabledFlag, SwarmSyncUpdateDelay, + SwarmMaxStreamPeerServersFlag, SwarmLightNodeEnabled, SwarmDeliverySkipCheckFlag, SwarmListenAddrFlag, diff --git a/swarm/api/config.go b/swarm/api/config.go index e753890e4..be7385408 100644 --- a/swarm/api/config.go +++ b/swarm/api/config.go @@ -50,26 +50,27 @@ type Config struct { Swap *swap.LocalProfile Pss *pss.PssParams //*network.SyncParams - Contract common.Address - EnsRoot common.Address - EnsAPIs []string - Path string - ListenAddr string - Port string - PublicKey string - BzzKey string - NodeID string - NetworkID uint64 - SwapEnabled bool - SyncEnabled bool - SyncingSkipCheck bool - DeliverySkipCheck bool - LightNodeEnabled bool - SyncUpdateDelay time.Duration - SwapAPI string - Cors string - BzzAccount string - privateKey *ecdsa.PrivateKey + Contract common.Address + EnsRoot common.Address + EnsAPIs []string + Path string + ListenAddr string + Port string + PublicKey string + BzzKey string + NodeID string + NetworkID uint64 + SwapEnabled bool + SyncEnabled bool + SyncingSkipCheck bool + DeliverySkipCheck bool + MaxStreamPeerServers int + LightNodeEnabled bool + SyncUpdateDelay time.Duration + SwapAPI string + Cors string + BzzAccount string + privateKey *ecdsa.PrivateKey } //create a default config with all parameters to set to defaults @@ -80,20 +81,21 @@ func NewConfig() (c *Config) { FileStoreParams: storage.NewFileStoreParams(), HiveParams: network.NewHiveParams(), //SyncParams: network.NewDefaultSyncParams(), - Swap: swap.NewDefaultSwapParams(), - Pss: pss.NewPssParams(), - ListenAddr: DefaultHTTPListenAddr, - Port: DefaultHTTPPort, - Path: node.DefaultDataDir(), - EnsAPIs: nil, - EnsRoot: ens.TestNetAddress, - NetworkID: network.DefaultNetworkID, - SwapEnabled: false, - SyncEnabled: true, - SyncingSkipCheck: false, - DeliverySkipCheck: true, - SyncUpdateDelay: 15 * time.Second, - SwapAPI: "", + Swap: swap.NewDefaultSwapParams(), + Pss: pss.NewPssParams(), + ListenAddr: DefaultHTTPListenAddr, + Port: DefaultHTTPPort, + Path: node.DefaultDataDir(), + EnsAPIs: nil, + EnsRoot: ens.TestNetAddress, + NetworkID: network.DefaultNetworkID, + SwapEnabled: false, + SyncEnabled: true, + SyncingSkipCheck: false, + MaxStreamPeerServers: 10000, + DeliverySkipCheck: true, + SyncUpdateDelay: 15 * time.Second, + SwapAPI: "", } return diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index deb7e9815..72fdb2bd9 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) { return globalStoreDir, globalStore, nil } -func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { +func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) { // setup addr := network.RandomAddr() // tested peers peer address to := network.NewKademlia(addr.OAddr, network.NewKadParams()) @@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora delivery := NewDelivery(to, netStore) netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New - streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), nil) + streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions) teardown := func() { streamer.Close() removeDataDir() diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 9fb90eeba..b021b8771 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -39,7 +39,7 @@ import ( ) func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) { } func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(t) + tester, streamer, localStore, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 117f88044..74c785d58 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e defer func() { if err != nil { - if e := p.Send(context.TODO(), SubscribeErrorMsg{ + // The error will be sent as a subscribe error message + // and will not be returned as it will prevent any new message + // exchange between peers over p2p. Instead, error will be returned + // only if there is one from sending subscribe error message. + err = p.Send(context.TODO(), SubscribeErrorMsg{ Error: err.Error(), - }); e != nil { - log.Error("send stream subscribe error message", "err", err) - } + }) } }() diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 5fdaa7b87..ef6bbdf70 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -18,6 +18,7 @@ package stream import ( "context" + "errors" "fmt" "sync" "time" @@ -46,6 +47,10 @@ func (e *notFoundError) Error() string { return fmt.Sprintf("%s not found for stream %q", e.t, e.s) } +// ErrMaxPeerServers will be returned if peer server limit is reached. +// It will be sent in the SubscribeErrorMsg. +var ErrMaxPeerServers = errors.New("max peer servers") + // Peer is the Peer extension for the streaming protocol type Peer struct { *protocols.Peer @@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) { if p.servers[s] != nil { return nil, fmt.Errorf("server %s already registered", s) } + + if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers { + return nil, ErrMaxPeerServers + } + os := &server{ Server: o, stream: s, @@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error { return newNotFoundError("client", s) } client.close() + delete(p.clients, s) return nil } diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 65b8dff5a..1eda06c6a 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -60,6 +60,7 @@ type Registry struct { delivery *Delivery intervalsStore state.Store doRetrieve bool + maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. @@ -68,6 +69,7 @@ type RegistryOptions struct { DoSync bool DoRetrieve bool SyncUpdateDelay time.Duration + MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor @@ -87,6 +89,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy delivery: delivery, intervalsStore: intervalsStore, doRetrieve: options.DoRetrieve, + maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 0390a7b9b..0bdebefa7 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "errors" + "strconv" "testing" "time" @@ -28,7 +29,7 @@ import ( ) func TestStreamerSubscribe(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -42,7 +43,7 @@ func TestStreamerSubscribe(t *testing.T) { } func TestStreamerRequestSubscription(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -127,7 +128,7 @@ func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -220,7 +221,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -287,7 +288,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -353,7 +354,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -397,7 +398,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -462,7 +463,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { } func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -527,7 +528,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -626,7 +627,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t) + tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) @@ -752,3 +753,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { t.Fatal(err) } } + +// TestMaxPeerServersWithUnsubscribe creates a registry with a limited +// number of stream servers, and performs a test with subscriptions and +// unsubscriptions, checking if unsubscriptions will remove streams, +// leaving place for new streams. +func TestMaxPeerServersWithUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + node := tester.Nodes[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: node.ID(), + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "unsubscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 0, + Msg: &UnsubscribeMsg{ + Stream: stream, + }, + Peer: node.ID(), + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} + +// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited +// number of stream servers, and performs subscriptions to detect subscriptions +// error message exchange. +func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { + var maxPeerServers = 6 + tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + MaxPeerServers: maxPeerServers, + }) + defer teardown() + if err != nil { + t.Fatal(err) + } + + streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { + return newTestServer(t), nil + }) + + node := tester.Nodes[0] + + for i := 0; i < maxPeerServers+10; i++ { + stream := NewStream("foo", strconv.Itoa(i), true) + + if i >= maxPeerServers { + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 7, + Msg: &SubscribeErrorMsg{ + Error: ErrMaxPeerServers.Error(), + }, + Peer: node.ID(), + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + continue + } + + err = tester.TestExchanges(p2ptest.Exchange{ + Label: "Subscribe message", + Triggers: []p2ptest.Trigger{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + Expects: []p2ptest.Expect{ + { + Code: 1, + Msg: &OfferedHashesMsg{ + Stream: stream, + HandoverProof: &HandoverProof{ + Handover: &Handover{}, + }, + Hashes: make([]byte, HashSize), + From: 1, + To: 1, + }, + Peer: node.ID(), + }, + }, + }) + + if err != nil { + t.Fatal(err) + } + } +} diff --git a/swarm/swarm.go b/swarm/swarm.go index 8b2661529..0cd56d4eb 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -180,6 +180,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e DoSync: config.SyncEnabled, DoRetrieve: true, SyncUpdateDelay: config.SyncUpdateDelay, + MaxPeerServers: config.MaxStreamPeerServers, }) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage