diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 7a61950ed..7957a8bf7 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -93,10 +93,6 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer? - if from > 0 { - from-- - } batchStart := time.Now() descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go index ce539924b..0f7e48729 100644 --- a/swarm/storage/localstore/subscription_pull.go +++ b/swarm/storage/localstore/subscription_pull.go @@ -31,9 +31,9 @@ import ( // SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index. // Pull syncing index can be only subscribed to a particular proximity order bin. If since -// is not 0, the iteration will start from the first item stored after that id. If until is not 0, +// is not 0, the iteration will start from the since item (the item with binID == since). If until is not 0, // only chunks stored up to this id will be sent to the channel, and the returned channel will be -// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop +// closed. The since-until interval is closed on since side, and closed on until side: [since,until]. Returned stop // function will terminate current and further iterations without errors, and also close the returned channel. // Make sure that you check the second returned parameter from the channel to stop iteration when its value // is false. @@ -135,7 +135,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) return } - first = false + if count > 0 { + first = false + } case <-stopChan: // terminate the subscription // on stop diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index bf364ed44..95a2fa8b1 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -28,6 +28,55 @@ import ( "github.com/ethereum/go-ethereum/swarm/shed" ) +// TestDB_SubscribePull_first is a regression test for the first=false (from-1) bug +// The bug was that `first=false` was not behind an if-condition `if count > 0`. This resulted in chunks being missed, when +// the subscription is established before the chunk is actually uploaded. For example if a subscription is established with since=49, +// which means that the `SubscribePull` method should return chunk with BinID=49 via the channel, and the chunk for BinID=49 is uploaded, +// after the subscription, then it would have been skipped, where the correct behaviour is to not skip it and return it via the channel. +func TestDB_SubscribePull_first(t *testing.T) { + db, cleanupFunc := newTestDB(t, nil) + defer cleanupFunc() + + addrs := make(map[uint8][]chunk.Address) + var addrsMu sync.Mutex + var wantedChunksCount int + + // prepopulate database with some chunks + // before the subscription + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100) + + // any bin should do the trick + bin := uint8(1) + + chunksInGivenBin := uint64(len(addrs[bin])) + + errc := make(chan error) + + since := chunksInGivenBin + 1 + + go func() { + ch, stop := db.SubscribePull(context.TODO(), bin, since, 0) + defer stop() + + chnk := <-ch + + if chnk.BinID != since { + errc <- fmt.Errorf("expected chunk.BinID to be %v , but got %v", since, chnk.BinID) + } else { + errc <- nil + } + }() + + time.Sleep(100 * time.Millisecond) + + uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100) + + err := <-errc + if err != nil { + t.Fatal(err) + } +} + // TestDB_SubscribePull uploads some chunks before and after // pull syncing subscription is created and validates if // all addresses are received in the right order