From 993b145f25845e50e8af41ffb1116eaee381d693 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Thu, 11 Apr 2019 10:26:52 +0200 Subject: [PATCH] swarm/storage/localstore: fix export db.Put signature cmd/swarm/swarm-smoke: improve smoke tests (#1337) swarm/network: remove dead code (#1339) swarm/network: remove FetchStore and SyncChunkStore in favor of NetStore (#1342) --- cmd/swarm/swarm-smoke/feed_upload_and_sync.go | 8 +- cmd/swarm/swarm-smoke/main.go | 33 ++- cmd/swarm/swarm-smoke/sliding_window.go | 10 +- cmd/swarm/swarm-smoke/upload_and_sync.go | 236 +++++++++++------- cmd/swarm/swarm-smoke/upload_speed.go | 8 +- cmd/swarm/swarm-smoke/util.go | 18 +- p2p/protocols/protocol.go | 4 +- p2p/protocols/protocol_test.go | 2 +- p2p/testing/peerpool.go | 2 +- swarm/api/inspector.go | 35 ++- swarm/chunk/chunk.go | 6 - swarm/network/hive.go | 2 +- swarm/network/kademlia.go | 15 +- swarm/network/kademlia_test.go | 2 +- swarm/network/stream/delivery.go | 149 +++-------- swarm/network/stream/delivery_test.go | 162 +----------- swarm/network/stream/intervals_test.go | 10 +- swarm/network/stream/lightnode_test.go | 89 +------ swarm/network/stream/messages.go | 4 +- swarm/network/stream/peer.go | 2 +- .../network/stream/snapshot_retrieval_test.go | 1 - swarm/network/stream/snapshot_sync_test.go | 1 - swarm/network/stream/stream.go | 124 ++++----- swarm/network/stream/streamer_test.go | 21 +- swarm/network/stream/syncer.go | 82 ++---- swarm/network/stream/syncer_test.go | 7 +- swarm/storage/localstore/export.go | 2 +- swarm/swarm.go | 6 - 28 files changed, 378 insertions(+), 663 deletions(-) diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 6b3fed0c7..b5ffc43d2 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -26,11 +26,11 @@ const ( feedRandomDataLength = 8 ) -func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func feedUploadAndSyncCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- feedUploadAndSync(ctx, tuid) + errc <- feedUploadAndSync(ctx) }() select { @@ -46,7 +46,7 @@ func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { } } -func feedUploadAndSync(c *cli.Context, tuid string) error { +func feedUploadAndSync(c *cli.Context) error { log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing") // create a random private key to sign updates with and derive the address @@ -272,7 +272,7 @@ func feedUploadAndSync(c *cli.Context, tuid string) error { ruid := uuid.New()[:8] go func(url string, endpoint string, ruid string) { for { - err := fetch(url, endpoint, fileHash, ruid, "") + err := fetch(url, endpoint, fileHash, ruid) if err != nil { continue } diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 860fbcc1d..2c1dd65a0 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -37,17 +37,17 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - inputSeed int - syncDelay int - httpPort int - wsPort int - verbosity int - timeout int - single bool - trackTimeout int + allhosts string + hosts []string + filesize int + syncDelay int + inputSeed int + httpPort int + wsPort int + verbosity int + timeout int + single bool + onlyUpload bool ) func main() { @@ -101,7 +101,7 @@ func main() { }, cli.IntFlag{ Name: "timeout", - Value: 120, + Value: 180, Usage: "timeout in seconds after which kill the process", Destination: &timeout, }, @@ -110,11 +110,10 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, - cli.IntFlag{ - Name: "track-timeout", - Value: 5, - Usage: "timeout in seconds to wait for GetAllReferences to return", - Destination: &trackTimeout, + cli.BoolFlag{ + Name: "only-upload", + Usage: "whether to only upload content to a single node without fetching", + Destination: &onlyUpload, }, } diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index d589124bd..ab082c543 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -35,11 +35,11 @@ type uploadResult struct { digest []byte } -func slidingWindowCmd(ctx *cli.Context, tuid string) error { +func slidingWindowCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- slidingWindow(ctx, tuid) + errc <- slidingWindow(ctx) }() err := <-errc @@ -49,10 +49,10 @@ func slidingWindowCmd(ctx *cli.Context, tuid string) error { return err } -func slidingWindow(ctx *cli.Context, tuid string) error { +func slidingWindow(ctx *cli.Context) error { var hashes []uploadResult //swarm hashes of the uploads nodes := len(hosts) - log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) + log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) uploadedBytes := 0 networkDepth := 0 errored := false @@ -107,7 +107,7 @@ outer: start = time.Now() // fetch hangs when swarm dies out, so we have to jump through a bit more hoops to actually // catch the timeout, but also allow this retry logic - err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") + err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid) if err != nil { log.Error("error fetching hash", "err", err) continue diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 6c20a4fa6..6a434a0b2 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -23,22 +23,20 @@ import ( "io/ioutil" "math/rand" "os" - "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func uploadAndSyncCmd(ctx *cli.Context) error { // use input seed if it has been set if inputSeed != 0 { seed = inputSeed @@ -49,7 +47,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { errc := make(chan error) go func() { - errc <- uploadAndSync(ctx, randomBytes, tuid) + errc <- uploadAndSync(ctx, randomBytes) }() var err error @@ -65,7 +63,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { } // trigger debug functionality on randomBytes - e := trackChunks(randomBytes[:]) + e := trackChunks(randomBytes[:], true) if e != nil { log.Error(e.Error()) } @@ -73,50 +71,84 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { return err } -func trackChunks(testData []byte) error { +func trackChunks(testData []byte, submitMetrics bool) error { addrs, err := getAllRefs(testData) if err != nil { return err } for i, ref := range addrs { - log.Trace(fmt.Sprintf("ref %d", i), "ref", ref) + log.Debug(fmt.Sprintf("ref %d", i), "ref", ref) } + var globalYes, globalNo int + var globalMu sync.Mutex + var hasErr bool + + var wg sync.WaitGroup + wg.Add(len(hosts)) + for _, host := range hosts { - httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + host := host + go func() { + defer wg.Done() + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) - hostChunks := []string{} - - rpcClient, err := rpc.Dial(httpHost) - if err != nil { - log.Error("error dialing host", "err", err, "host", httpHost) - continue - } - - var hasInfo []api.HasInfo - err = rpcClient.Call(&hasInfo, "bzz_has", addrs) - if err != nil { - log.Error("error calling rpc client", "err", err, "host", httpHost) - continue - } - - count := 0 - for _, info := range hasInfo { - if info.Has { - hostChunks = append(hostChunks, "1") - } else { - hostChunks = append(hostChunks, "0") - count++ + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + rpcClient, err := rpc.DialContext(ctx, httpHost) + if rpcClient != nil { + defer rpcClient.Close() + } + if err != nil { + log.Error("error dialing host", "err", err, "host", httpHost) + hasErr = true + return } - } - if count == 0 { - log.Info("host reported to have all chunks", "host", host) - } + var hostChunks string + err = rpcClient.Call(&hostChunks, "bzz_has", addrs) + if err != nil { + log.Error("error calling rpc client", "err", err, "host", httpHost) + hasErr = true + return + } - log.Trace("chunks", "chunks", strings.Join(hostChunks, ""), "host", host) + yes, no := 0, 0 + for _, val := range hostChunks { + if val == '1' { + yes++ + } else { + no++ + } + } + + if no == 0 { + log.Info("host reported to have all chunks", "host", host) + } + + log.Debug("chunks", "chunks", hostChunks, "yes", yes, "no", no, "host", host) + + if submitMetrics { + globalMu.Lock() + globalYes += yes + globalNo += no + globalMu.Unlock() + } + }() } + + wg.Wait() + + if !hasErr && submitMetrics { + // remove the chunks stored on the uploader node + globalYes -= len(addrs) + + metrics.GetOrRegisterCounter("deployment.chunks.yes", nil).Inc(int64(globalYes)) + metrics.GetOrRegisterCounter("deployment.chunks.no", nil).Inc(int64(globalNo)) + metrics.GetOrRegisterCounter("deployment.chunks.refs", nil).Inc(int64(len(addrs))) + } + return nil } @@ -130,15 +162,13 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { if err != nil { return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) - defer cancel() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(ctx, reader, false) + return fileStore.GetAllReferences(context.Background(), reader, false) } -func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { - log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed) +func uploadAndSync(c *cli.Context, randomBytes []byte) error { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() hash, err := upload(randomBytes, httpEndpoint(hosts[0])) @@ -155,53 +185,91 @@ func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { return err } - log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) - time.Sleep(time.Duration(syncDelay) * time.Second) + waitToSync() - wg := sync.WaitGroup{} - if single { - randIndex := 1 + rand.Intn(len(hosts)-1) - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) + log.Debug("chunks before fetch attempt", "hash", hash) - metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(hosts[randIndex]), ruid) - } else { - for _, endpoint := range hosts[1:] { - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) - - metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(endpoint), ruid) - } + err = trackChunks(randomBytes, false) + if err != nil { + log.Error(err.Error()) + } + + if onlyUpload { + log.Debug("only-upload is true, stoppping test", "hash", hash) + return nil + } + + randIndex := 1 + rand.Intn(len(hosts)-1) + + for { + start := time.Now() + err := fetch(hash, httpEndpoint(hosts[randIndex]), fhash, "") + if err != nil { + time.Sleep(2 * time.Second) + continue + } + ended := time.Since(start) + + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) + log.Info("fetch successful", "took", ended, "endpoint", httpEndpoint(hosts[randIndex])) + break } - wg.Wait() - log.Info("all hosts synced random file successfully") return nil } + +func isSyncing(wsHost string) (bool, error) { + rpcClient, err := rpc.Dial(wsHost) + if rpcClient != nil { + defer rpcClient.Close() + } + + if err != nil { + log.Error("error dialing host", "err", err) + return false, err + } + + var isSyncing bool + err = rpcClient.Call(&isSyncing, "bzz_isSyncing") + if err != nil { + log.Error("error calling host for isSyncing", "err", err) + return false, err + } + + log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing) + + return isSyncing, nil +} + +func waitToSync() { + t1 := time.Now() + + ns := uint64(1) + + for ns > 0 { + time.Sleep(3 * time.Second) + + notSynced := uint64(0) + var wg sync.WaitGroup + wg.Add(len(hosts)) + for i := 0; i < len(hosts); i++ { + i := i + go func(idx int) { + stillSyncing, err := isSyncing(wsEndpoint(hosts[idx])) + + if stillSyncing || err != nil { + atomic.AddUint64(¬Synced, 1) + } + wg.Done() + }(i) + } + wg.Wait() + + ns = atomic.LoadUint64(¬Synced) + } + + t2 := time.Since(t1) + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.wait-for-sync.deployment", nil).Update(t2) +} diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go index 20bf7b86c..047ea0092 100644 --- a/cmd/swarm/swarm-smoke/upload_speed.go +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -28,14 +28,14 @@ import ( cli "gopkg.in/urfave/cli.v1" ) -func uploadSpeedCmd(ctx *cli.Context, tuid string) error { - log.Info("uploading to "+hosts[0], "tuid", tuid, "seed", seed) +func uploadSpeedCmd(ctx *cli.Context) error { + log.Info("uploading to "+hosts[0], "seed", seed) randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) go func() { - errc <- uploadSpeed(ctx, tuid, randomBytes) + errc <- uploadSpeed(ctx, randomBytes) }() select { @@ -53,7 +53,7 @@ func uploadSpeedCmd(ctx *cli.Context, tuid string) error { } } -func uploadSpeed(c *cli.Context, tuid string, data []byte) error { +func uploadSpeed(c *cli.Context, data []byte) error { t1 := time.Now() hash, err := upload(data, hosts[0]) if err != nil { diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 87abb44b0..b95f993e8 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -59,28 +58,25 @@ func wsEndpoint(host string) string { return fmt.Sprintf("ws://%s:%d", host, wsPort) } -func wrapCliCommand(name string, command func(*cli.Context, string) error) func(*cli.Context) error { +func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) - // test uuid - tuid := uuid.New()[:8] - commandName = name hosts = strings.Split(allhosts, ",") defer func(now time.Time) { totalTime := time.Since(now) - log.Info("total time", "tuid", tuid, "time", totalTime, "kb", filesize) + log.Info("total time", "time", totalTime, "kb", filesize) metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) }(time.Now()) - log.Info("smoke test starting", "tuid", tuid, "task", name, "timeout", timeout) + log.Info("smoke test starting", "task", name, "timeout", timeout) metrics.GetOrRegisterCounter(name, nil).Inc(1) - return command(ctx, tuid) + return command(ctx) } } @@ -142,11 +138,11 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid } // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string, tuid string) error { +func fetch(hash string, endpoint string, original []byte, ruid string) error { ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") defer sp.Finish() - log.Info("http get request", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash) + log.Info("http get request", "ruid", ruid, "endpoint", endpoint, "hash", hash) var tn time.Time reqUri := endpoint + "/bzz:/" + hash + "/" @@ -170,7 +166,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string, tuid stri log.Error(err.Error(), "ruid", ruid) return err } - log.Info("http get response", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) + log.Info("http get response", "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) if res.StatusCode != 200 { err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 1600a11f9..164e3fa4b 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -243,7 +243,7 @@ func (p *Peer) Run(handler func(ctx context.Context, msg interface{}) error) err // Drop disconnects a peer. // TODO: may need to implement protocol drop only? don't want to kick off the peer // if they are useful for other protocols -func (p *Peer) Drop(err error) { +func (p *Peer) Drop() { p.Disconnect(p2p.DiscSubprotocolError) } @@ -291,7 +291,7 @@ func (p *Peer) Send(ctx context.Context, msg interface{}) error { if p.spec.Hook != nil { err := p.spec.Hook.Send(p, wmsg.Size, msg) if err != nil { - p.Drop(err) + p.Drop() return err } } diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go index 9ac76ea2f..6d5ea8b92 100644 --- a/p2p/protocols/protocol_test.go +++ b/p2p/protocols/protocol_test.go @@ -126,7 +126,7 @@ func newProtocol(pp *p2ptest.TestPeerPool) func(*p2p.Peer, p2p.MsgReadWriter) er case *kill: // demonstrates use of peerPool, killing another peer connection as a response to a message id := msg.C - pp.Get(id).Drop(errors.New("killed")) + pp.Get(id).Drop() return nil case *drop: diff --git a/p2p/testing/peerpool.go b/p2p/testing/peerpool.go index 01ccce67e..09db4b246 100644 --- a/p2p/testing/peerpool.go +++ b/p2p/testing/peerpool.go @@ -26,7 +26,7 @@ import ( type TestPeer interface { ID() enode.ID - Drop(error) + Drop() } // TestPeerPool is an example peerPool to demonstrate registration of peer connections diff --git a/swarm/api/inspector.go b/swarm/api/inspector.go index 2ae6b4da8..c4151bf20 100644 --- a/swarm/api/inspector.go +++ b/swarm/api/inspector.go @@ -19,7 +19,11 @@ package api import ( "context" "fmt" + "strings" + "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -47,25 +51,34 @@ func (inspector *Inspector) ListKnown() []string { return res } -type HasInfo struct { - Addr string `json:"address"` - Has bool `json:"has"` +func (inspector *Inspector) IsSyncing() bool { + lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) + + // last received chunks msg time + lrct := time.Unix(0, lastReceivedChunksMsg.Value()) + + // if last received chunks msg time is after now-15sec. (i.e. within the last 15sec.) then we say that the node is still syncing + // technically this is not correct, because this might have been a retrieve request, but for the time being it works for our purposes + // because we know we are not making retrieve requests on the node while checking this + return lrct.After(time.Now().Add(-15 * time.Second)) } // Has checks whether each chunk address is present in the underlying datastore, // the bool in the returned structs indicates if the underlying datastore has // the chunk stored with the given address (true), or not (false) -func (inspector *Inspector) Has(chunkAddresses []storage.Address) []HasInfo { - results := make([]HasInfo, 0) +func (inspector *Inspector) Has(chunkAddresses []storage.Address) string { + hostChunks := []string{} for _, addr := range chunkAddresses { - res := HasInfo{} - res.Addr = addr.String() has, err := inspector.netStore.Has(context.Background(), addr) if err != nil { - has = false + log.Error(err.Error()) + } + if has { + hostChunks = append(hostChunks, "1") + } else { + hostChunks = append(hostChunks, "0") } - res.Has = has - results = append(results, res) } - return results + + return strings.Join(hostChunks, "") } diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index c8551814c..2455904f3 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -172,12 +172,6 @@ type Store interface { Close() (err error) } -// FetchStore is a Store which supports syncing -type FetchStore interface { - Store - FetchFunc(ctx context.Context, addr Address) func(context.Context) error -} - // Validator validates a chunk. type Validator interface { Validate(ch Chunk) bool diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 2eb521f1d..ad51b29c2 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -116,7 +116,7 @@ func (h *Hive) Stop() error { log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) h.EachConn(nil, 255, func(p *Peer, _ int) bool { log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) - p.Drop(nil) + p.Drop() return true }) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index dd6de44fd..f553cb5f4 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/pot" sv "github.com/ethereum/go-ethereum/swarm/version" @@ -138,6 +139,9 @@ func (e *entry) Hex() string { func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.register", nil).Inc(1) + var known, size int for _, p := range peers { log.Trace("kademlia trying to register", "addr", p) @@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return newEntry(p) } - log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr) - return v }) if found { @@ -186,6 +188,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1) + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) // collect undersaturated bins in ascending order of number of connected peers // and from shallow to deep (ascending order of PO) @@ -297,6 +302,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.on", nil).Inc(1) + var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val { // if not found live @@ -320,7 +328,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrCountC <- k.addrs.Size() } } - log.Trace(k.string()) // calculate if depth of saturation changed depth := uint8(k.saturation()) var changed bool @@ -608,7 +615,7 @@ func (k *Kademlia) string() string { if len(sv.GitCommit) > 0 { rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit)) } - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3])) + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr())) rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize)) liverows := make([]string, k.MaxProxDisplay) diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b4663eee5..93b990138 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -541,7 +541,7 @@ func TestKademliaHiveString(t *testing.T) { tk.Register("10000000", "10000001") tk.MaxProxDisplay = 8 h := tk.String() - expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" + expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 0000000000000000000000000000000000000000000000000000000000000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" if expH[104:] != h[104:] { t.Fatalf("incorrect hive output. expected %v, got %v", expH, h) } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 059666723..aa2c817ea 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -33,11 +34,6 @@ import ( olog "github.com/opentracing/opentracing-go/log" ) -const ( - swarmChunkServerStreamName = "RETRIEVE_REQUEST" - deliveryCap = 32 -) - var ( processReceivedChunksCount = metrics.NewRegisteredCounter("network.stream.received_chunks.count", nil) handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) @@ -45,93 +41,25 @@ var ( requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) type Delivery struct { - chunkStore chunk.FetchStore - kad *network.Kademlia - getPeer func(enode.ID) *Peer + netStore *storage.NetStore + kad *network.Kademlia + getPeer func(enode.ID) *Peer + quit chan struct{} } -func NewDelivery(kad *network.Kademlia, chunkStore chunk.FetchStore) *Delivery { +func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { return &Delivery{ - chunkStore: chunkStore, - kad: kad, + netStore: netStore, + kad: kad, + quit: make(chan struct{}), } } -// SwarmChunkServer implements Server -type SwarmChunkServer struct { - deliveryC chan []byte - batchC chan []byte - chunkStore storage.ChunkStore - currentLen uint64 - quit chan struct{} -} - -// NewSwarmChunkServer is SwarmChunkServer constructor -func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer { - s := &SwarmChunkServer{ - deliveryC: make(chan []byte, deliveryCap), - batchC: make(chan []byte), - chunkStore: chunkStore, - quit: make(chan struct{}), - } - go s.processDeliveries() - return s -} - -// processDeliveries handles delivered chunk hashes -func (s *SwarmChunkServer) processDeliveries() { - var hashes []byte - var batchC chan []byte - for { - select { - case <-s.quit: - return - case hash := <-s.deliveryC: - hashes = append(hashes, hash...) - batchC = s.batchC - case batchC <- hashes: - hashes = nil - batchC = nil - } - } -} - -// SessionIndex returns zero in all cases for SwarmChunkServer. -func (s *SwarmChunkServer) SessionIndex() (uint64, error) { - return 0, nil -} - -// SetNextBatch -func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) { - select { - case hashes = <-s.batchC: - case <-s.quit: - return - } - - from = s.currentLen - s.currentLen += uint64(len(hashes)) - to = s.currentLen - return -} - -// Close needs to be called on a stream server -func (s *SwarmChunkServer) Close() { - close(s.quit) -} - -// GetData retrieves chunk data from db store -func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.chunkStore.Get(ctx, chunk.ModeGetRequest, storage.Address(key)) - if err != nil { - return nil, err - } - return ch.Data(), nil -} - // RetrieveRequestMsg is the protocol msg for chunk retrieve requests type RetrieveRequestMsg struct { Addr storage.Address @@ -150,12 +78,6 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * osp.LogFields(olog.String("ref", req.Addr.String())) - s, err := sp.getServer(NewStream(swarmChunkServerStreamName, "", true)) - if err != nil { - return err - } - streamer := s.Server.(*SwarmChunkServer) - var cancel func() // TODO: do something with this hardcoded timeout, maybe use TTL in the future ctx = context.WithValue(ctx, "peer", sp.ID().String()) @@ -165,36 +87,26 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req * go func() { select { case <-ctx.Done(): - case <-streamer.quit: + case <-d.quit: } cancel() }() go func() { defer osp.Finish() - ch, err := d.chunkStore.Get(ctx, chunk.ModeGetRequest, req.Addr) + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) if err != nil { retrieveChunkFail.Inc(1) log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) return } - if req.SkipCheck { - syncing := false - osp.LogFields(olog.Bool("skipCheck", true)) + syncing := false - err = sp.Deliver(ctx, ch, s.priority, syncing) - if err != nil { - log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) - return + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) } - osp.LogFields(olog.Bool("skipCheck", false)) - select { - case streamer.deliveryC <- ch.Address()[:]: - case <-streamer.quit: - } - + osp.LogFields(olog.Bool("delivered", true)) }() return nil @@ -225,6 +137,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int processReceivedChunksCount.Inc(1) + // record the last time we received a chunk delivery message + lastReceivedChunksMsg.Update(time.Now().UnixNano()) + var msg *ChunkDeliveryMsg var mode chunk.ModePut switch r := req.(type) { @@ -244,31 +159,25 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int case *ChunkDeliveryMsgSyncing: msg = (*ChunkDeliveryMsg)(r) mode = chunk.ModePutSync + case *ChunkDeliveryMsg: + msg = r + mode = chunk.ModePutSync } - // retrieve the span for the originating retrieverequest - spanID := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), msg.Addr) - span := tracing.ShiftSpanByKey(spanID) - log.Trace("handle.chunk.delivery", "ref", msg.Addr, "from peer", sp.ID()) go func() { defer osp.Finish() - if span != nil { - span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg")) - defer span.Finish() - } - msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) - _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) + _, err := d.netStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs // TODO: Enable this log line // log.Warn("invalid chunk delivered", "peer", sp.ID(), "chunk", msg.Addr, ) - msg.peer.Drop(err) + msg.peer.Drop() } } log.Trace("handle.chunk.delivery", "done put", msg.Addr, "err", err) @@ -276,6 +185,12 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int return nil } +func (d *Delivery) Close() { + d.kad.CloseNeighbourhoodDepthC() + d.kad.CloseAddrCountC() + close(d.quit) +} + // RequestFromPeers sends a chunk retrieve request to a peer // The most eligible peer that hasn't already been sent to is chosen // TODO: define "eligible" diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 801e6d98a..4037243c1 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -41,64 +41,11 @@ import ( "github.com/ethereum/go-ethereum/swarm/testutil" ) -//Tests initializing a retrieve request -func TestStreamerRetrieveRequest(t *testing.T) { - regOpts := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, streamer, _, teardown, err := newStreamerTester(regOpts) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - ctx := context.Background() - req := network.NewRequest( - storage.Address(hash0[:]), - true, - &sync.Map{}, - ) - streamer.delivery.RequestFromPeers(ctx, req) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Expects: []p2ptest.Expect{ - { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - { //expect a retrieve request message for the given hash - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash0[:], - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatalf("Expected no error, got %v", err) - } -} - //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) //Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, //do no syncing + tester, _, _, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, //do no syncing }) if err != nil { t.Fatal(err) @@ -109,30 +56,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { chunk := storage.NewChunk(storage.Address(hash0[:]), nil) - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - //simulate pre-subscription to RETRIEVE_REQUEST stream on peer - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { //first expect a subscription to the RETRIEVE_REQUEST stream - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { //then the actual RETRIEVE_REQUEST.... @@ -159,7 +84,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { //should fail with a timeout as the peer we are requesting //the chunk from does not have the chunk - expectedError := `exchange #1 "RetrieveRequestMsg": timed out` + expectedError := `exchange #0 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -168,9 +93,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // upstream request server receives a retrieve Request and responds with // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { - tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalEnabled, - Syncing: SyncingDisabled, + tester, _, localStore, teardown, err := newStreamerTester(&RegistryOptions{ + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -179,36 +103,14 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { node := tester.Nodes[0] - peer := streamer.getPeer(node.ID()) - - stream := NewStream(swarmChunkServerStreamName, "", true) - - peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }) - - hash := storage.Address(hash0[:]) - ch := storage.NewChunk(hash, hash) + hash := storage.Address(hash1[:]) + ch := storage.NewChunk(hash, hash1[:]) _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ - Expects: []p2ptest.Expect{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - History: nil, - Priority: Top, - }, - Peer: node.ID(), - }, - }, - }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -219,53 +121,12 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { Peer: node.ID(), }, }, - Expects: []p2ptest.Expect{ - { - Code: 1, - Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, - Hashes: hash, - From: 0, - // TODO: why is this 32??? - To: 32, - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - - if err != nil { - t.Fatal(err) - } - - hash = storage.Address(hash1[:]) - ch = storage.NewChunk(hash, hash1[:]) - _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) - if err != nil { - t.Fatalf("Expected no err got %v", err) - } - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "RetrieveRequestMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 5, - Msg: &RetrieveRequestMsg{ - Addr: hash, - SkipCheck: true, - }, - Peer: node.ID(), - }, - }, Expects: []p2ptest.Expect{ { Code: 6, Msg: &ChunkDeliveryMsg{ - Addr: hash, - SData: hash, + Addr: ch.Address(), + SData: ch.Data(), }, Peer: node.ID(), }, @@ -359,8 +220,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -472,7 +332,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalEnabled, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -623,7 +482,6 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, Syncing: SyncingDisabled, - Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }, nil) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 1f2cdcada..660954857 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" @@ -67,7 +66,6 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }, nil) @@ -288,20 +286,20 @@ func enableNotifications(r *Registry, peerID enode.ID, s Stream) error { type testExternalClient struct { hashes chan []byte - store chunk.FetchStore + netStore *storage.NetStore enableNotificationsC chan struct{} } -func newTestExternalClient(store chunk.FetchStore) *testExternalClient { +func newTestExternalClient(netStore *storage.NetStore) *testExternalClient { return &testExternalClient{ hashes: make(chan []byte), - store: store, + netStore: netStore, enableNotificationsC: make(chan struct{}), } } func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { - wait := c.store.FetchFunc(ctx, storage.Address(hash)) + wait := c.netStore.FetchFunc(ctx, storage.Address(hash)) if wait == nil { return nil } diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 501660fab..eb4e73d47 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -21,95 +21,11 @@ import ( p2ptest "github.com/ethereum/go-ethereum/p2p/testing" ) -// This test checks the default behavior of the server, that is -// when it is serving Retrieve requests. -func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalClientOnly, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges(p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } - - err = tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID()}) - if err == nil || err.Error() != "timed out waiting for peers to disconnect" { - t.Fatalf("Expected no disconnect, got %v", err) - } -} - -// This test checks the Lightnode behavior of server, when serving Retrieve -// requests are disabled -func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { - registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, - } - tester, _, _, teardown, err := newStreamerTester(registryOptions) - if err != nil { - t.Fatal(err) - } - defer teardown() - - node := tester.Nodes[0] - - stream := NewStream(swarmChunkServerStreamName, "", false) - - err = tester.TestExchanges( - p2ptest.Exchange{ - Label: "SubscribeMsg", - Triggers: []p2ptest.Trigger{ - { - Code: 4, - Msg: &SubscribeMsg{ - Stream: stream, - }, - Peer: node.ID(), - }, - }, - Expects: []p2ptest.Expect{ - { - Code: 7, - Msg: &SubscribeErrorMsg{ - Error: "stream RETRIEVE_REQUEST not registered", - }, - Peer: node.ID(), - }, - }, - }) - if err != nil { - t.Fatalf("Got %v", err) - } -} - // This test checks the default behavior of the server, that is // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingRegisterOnly, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { @@ -153,8 +69,7 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(registryOptions) if err != nil { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b293724cc..b60d2fcc9 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -247,7 +247,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-errC: if err != nil { log.Debug("client.handleOfferedHashesMsg() error waiting for chunk, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() return } case <-ctx.Done(): @@ -289,7 +289,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg case err := <-c.next: if err != nil { log.Warn("c.next error dropping peer", "err", err) - p.Drop(err) + p.Drop() return } case <-c.quit: diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 152814bd4..98b237ce2 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -90,7 +90,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) - p.Drop(err) + p.Drop() } }) diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index 2d5935276..e34f87951 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -119,7 +119,6 @@ var retrievalSimServiceMap = map[string]simulation.ServiceFunc{ } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: syncUpdateDelay, }, nil) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 605c9dbeb..fefdb7c9f 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -118,7 +118,6 @@ var simServiceMap = map[string]simulation.ServiceFunc{ store := state.NewInmemoryStore() r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }, nil) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0d990da5c..10a8f7ec5 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( "context" - "errors" "fmt" "math" "reflect" @@ -30,11 +29,11 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/storage" ) const ( @@ -49,7 +48,6 @@ const ( // Enumerate options for syncing and retrieval type SyncingOption int -type RetrievalOption int // Syncing options const ( @@ -61,17 +59,6 @@ const ( SyncingAutoSubscribe ) -const ( - // Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) - RetrievalDisabled RetrievalOption = iota - // Only the client side of the retrieve request is registered. - // (light nodes do not serve retrieve requests) - // once the client is registered, subscription to retrieve request stream is always sent - RetrievalClientOnly - // Both client and server funcs are registered, subscribe sent automatically - RetrievalEnabled -) - // subscriptionFunc is used to determine what to do in order to perform subscriptions // usually we would start to really subscribe to nodes, but for tests other functionality may be needed // (see TestRequestPeerSubscriptions in streamer_test.go) @@ -90,7 +77,6 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - autoRetrieval bool // automatically subscribe to retrieve request stream maxPeerServers int spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting @@ -101,22 +87,19 @@ type Registry struct { // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - Syncing SyncingOption // Defines syncing behavior - Retrieval RetrievalOption // Defines retrieval behavior + Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } // NewRegistry is Streamer constructor -func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.FetchStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { +func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry { if options == nil { options = &RegistryOptions{} } if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } - // check if retrieval has been disabled - retrieval := options.Retrieval != RetrievalDisabled quit := make(chan struct{}) @@ -128,7 +111,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, balance: balance, quit: quit, @@ -139,27 +121,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore chunk.Fetc streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - // if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) - if options.Retrieval == RetrievalEnabled { - streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { - if !live { - return nil, errors.New("only live retrieval requests supported") - } - return NewSwarmChunkServer(delivery.chunkStore), nil - }) - } - - // if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) - if options.Retrieval != RetrievalDisabled { - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) - } - // If syncing is not disabled, the syncing functions are registered (both client and server) if options.Syncing != SyncingDisabled { - RegisterSwarmSyncerServer(streamer, syncChunkStore) - RegisterSwarmSyncerClient(streamer, syncChunkStore) + RegisterSwarmSyncerServer(streamer, netStore) + RegisterSwarmSyncerClient(streamer, netStore) } // if syncing is set to automatically subscribe to the syncing stream, start the subscription process @@ -381,7 +346,7 @@ func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8 } log.Debug("Subscribe ", "peer", peerId, "stream", s, "history", h) - return peer.SendPriority(context.TODO(), msg, priority) + return peer.Send(context.TODO(), msg) } func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error { @@ -422,8 +387,7 @@ func (r *Registry) Quit(peerId enode.ID, s Stream) error { func (r *Registry) Close() error { // Stop sending neighborhood depth change and address count // change from Kademlia that were initiated in NewRegistry constructor. - r.delivery.kad.CloseNeighbourhoodDepthC() - r.delivery.kad.CloseAddrCountC() + r.delivery.Close() close(r.quit) return r.intervalsStore.Close() } @@ -464,13 +428,6 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.autoRetrieval && !p.LightNode { - err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - return sp.Run(sp.HandleMsg) } @@ -619,19 +576,66 @@ func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error { return p.handleUnsubscribeMsg(msg) case *OfferedHashesMsg: - return p.handleOfferedHashesMsg(ctx, msg) + go func() { + err := p.handleOfferedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *TakeoverProofMsg: - return p.handleTakeoverProofMsg(ctx, msg) + go func() { + err := p.handleTakeoverProofMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *WantedHashesMsg: - return p.handleWantedHashesMsg(ctx, msg) + go func() { + err := p.handleWantedHashesMsg(ctx, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil - case *ChunkDeliveryMsgRetrieval, *ChunkDeliveryMsgSyncing: - return p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, msg) + case *ChunkDeliveryMsgRetrieval: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil + + case *ChunkDeliveryMsgSyncing: + // handling chunk delivery is the same for retrieval and syncing, so let's cast the msg + go func() { + err := p.streamer.delivery.handleChunkDeliveryMsg(ctx, p, ((*ChunkDeliveryMsg)(msg))) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RetrieveRequestMsg: - return p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + go func() { + err := p.streamer.delivery.handleRetrieveRequestMsg(ctx, p, msg) + if err != nil { + log.Error(err.Error()) + p.Drop() + } + }() + return nil case *RequestSubscriptionMsg: return p.handleRequestSubscription(ctx, msg) @@ -762,7 +766,7 @@ func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error return err } - if err := p.SendPriority(context.TODO(), tp, c.priority); err != nil { + if err := p.Send(context.TODO(), tp); err != nil { return err } if c.to > 0 && tp.Takeover.End >= c.to { @@ -964,15 +968,13 @@ func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error { } /* -GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. +GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects. */ -func (api *API) GetPeerSubscriptions() map[string][]string { - //create the empty map +func (api *API) GetPeerServerSubscriptions() map[string][]string { pstreams := make(map[string][]string) - //iterate all streamer peers api.streamer.peersMu.RLock() defer api.streamer.peersMu.RUnlock() diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index bdd3087bb..c7da05014 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -539,7 +539,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { t.Fatal(err) } - expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") + expectedError := errors.New("subprotocol error") if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } @@ -779,7 +779,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) @@ -940,8 +939,7 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { //`Price` interface implementation func TestHasPriceImplementation(t *testing.T) { _, r, _, teardown, err := newStreamerTester(&RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingDisabled, + Syncing: SyncingDisabled, }) if err != nil { t.Fatal(err) @@ -1123,8 +1121,8 @@ func TestRequestPeerSubscriptions(t *testing.T) { } } -// TestGetSubscriptions is a unit test for the api.GetPeerSubscriptions() function -func TestGetSubscriptions(t *testing.T) { +// TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function +func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers testPeerCount := 8 // every peer will have this amount of dummy servers @@ -1135,7 +1133,7 @@ func TestGetSubscriptions(t *testing.T) { r := &Registry{} api := NewAPI(r) // call once, at this point should be empty - regs := api.GetPeerSubscriptions() + regs := api.GetPeerServerSubscriptions() if len(regs) != 0 { t.Fatal("Expected subscription count to be 0, but it is not") } @@ -1159,7 +1157,7 @@ func TestGetSubscriptions(t *testing.T) { r.peers = peerMap // call the subscriptions again - regs = api.GetPeerSubscriptions() + regs = api.GetPeerServerSubscriptions() // count how many (fake) subscriptions there are cnt := 0 for _, reg := range regs { @@ -1175,11 +1173,11 @@ func TestGetSubscriptions(t *testing.T) { } /* -TestGetSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, +TestGetServerSubscriptionsRPC sets up a simulation network of `nodeCount` nodes, starts the simulation, waits for SyncUpdateDelay in order to kick off stream registration, then tests that there are subscriptions. */ -func TestGetSubscriptionsRPC(t *testing.T) { +func TestGetServerSubscriptionsRPC(t *testing.T) { if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { t.Skip("flaky with -race on Travis") @@ -1226,7 +1224,6 @@ func TestGetSubscriptionsRPC(t *testing.T) { // configure so that sync registrations actually happen r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalEnabled, Syncing: SyncingAutoSubscribe, //enable sync registrations SyncUpdateDelay: syncUpdateDelay, }, nil) @@ -1321,7 +1318,7 @@ func TestGetSubscriptionsRPC(t *testing.T) { //ask it for subscriptions pstreams := make(map[string][]string) - err = client.Call(&pstreams, "stream_getPeerSubscriptions") + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") if err != nil { return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) } diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index c573da5d2..79b04a307 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -34,27 +34,27 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - store chunk.FetchStore - quit chan struct{} + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, syncChunkStore chunk.FetchStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - store: syncChunkStore, - quit: make(chan struct{}), + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } -func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore chunk.FetchStore) { +func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, syncChunkStore) + return NewSwarmSyncerServer(po, netStore) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -68,7 +68,7 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, er // SessionIndex returns current storage bin (po) index. func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { - return s.store.LastPullSubscriptionBinID(s.po) + return s.netStore.LastPullSubscriptionBinID(s.po) } // SetNextBatch retrieves the next batch of hashes from the localstore. @@ -88,7 +88,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - descriptors, stop := s.store.SubscribePull(context.Background(), s.po, from, to) + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() const batchTimeout = 2 * time.Second @@ -118,7 +118,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // This is the most naive approach to label the chunk as synced // allowing it to be garbage collected. A proper way requires // validating that the chunk is successfully stored by the peer. - err := s.store.Set(context.Background(), chunk.ModeSetSync, d.Address) + err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { return nil, 0, 0, nil, err } @@ -158,67 +158,31 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // SwarmSyncerClient type SwarmSyncerClient struct { - store chunk.FetchStore - peer *Peer - stream Stream + netStore *storage.NetStore + peer *Peer + stream Stream } // NewSwarmSyncerClient is a contructor for provable data exchange syncer -func NewSwarmSyncerClient(p *Peer, store chunk.FetchStore, stream Stream) (*SwarmSyncerClient, error) { +func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error) { return &SwarmSyncerClient{ - store: store, - peer: p, - stream: stream, + netStore: netStore, + peer: p, + stream: stream, }, nil } -// // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer -// func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Address, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { -// retrieveC := make(storage.Chunk, chunksCap) -// RunChunkRequestor(p, retrieveC) -// storeC := make(storage.Chunk, chunksCap) -// RunChunkStorer(store, storeC) -// s := &SwarmSyncerClient{ -// po: po, -// priority: priority, -// sessionAt: sessionAt, -// start: index, -// end: index, -// nextC: make(chan struct{}, 1), -// intervals: intervals, -// sessionRoot: sessionRoot, -// sessionReader: chunker.Join(sessionRoot, retrieveC), -// retrieveC: retrieveC, -// storeC: storeC, -// } -// return s -// } - -// // StartSyncing is called on the Peer to start the syncing process -// // the idea is that it is called only after kademlia is close to healthy -// func StartSyncing(s *Streamer, peerId enode.ID, po uint8, nn bool) { -// lastPO := po -// if nn { -// lastPO = maxPO -// } -// -// for i := po; i <= lastPO; i++ { -// s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) -// s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) -// } -// } - // RegisterSwarmSyncerClient registers the client constructor function for // to handle incoming sync streams -func RegisterSwarmSyncerClient(streamer *Registry, store chunk.FetchStore) { +func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, store, NewStream("SYNC", t, live)) + return NewSwarmSyncerClient(p, netStore, NewStream("SYNC", t, live)) }) } // NeedData func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { - return s.store.FetchFunc(ctx, key) + return s.netStore.FetchFunc(ctx, key) } // BatchDone diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a8651f386..b787c7bb8 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -83,7 +83,6 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Retrieval: RetrievalDisabled, Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }, nil) @@ -232,8 +231,7 @@ func TestSameVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) @@ -296,8 +294,7 @@ func TestDifferentVersionID(t *testing.T) { } r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - Retrieval: RetrievalDisabled, - Syncing: SyncingAutoSubscribe, + Syncing: SyncingAutoSubscribe, }, nil) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go index bbea1d877..411392b4e 100644 --- a/swarm/storage/localstore/export.go +++ b/swarm/storage/localstore/export.go @@ -169,7 +169,7 @@ func (db *DB) Import(r io.Reader, legacy bool) (count int64, err error) { wg.Done() <-tokenPool default: - err := db.Put(ctx, chunk.ModePutUpload, ch) + _, err := db.Put(ctx, chunk.ModePutUpload, ch) if err != nil { errC <- err } diff --git a/swarm/swarm.go b/swarm/swarm.go index 7f5ee8361..2f025d9cc 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -204,15 +204,9 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e syncing = stream.SyncingDisabled } - retrieval := stream.RetrievalEnabled - if config.LightNodeEnabled { - retrieval = stream.RetrievalClientOnly - } - registryOptions := &stream.RegistryOptions{ SkipCheck: config.DeliverySkipCheck, Syncing: syncing, - Retrieval: retrieval, SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, }