swarm/storage: set false, only when we get a chunk back (#19599)
This commit is contained in:
parent
f2612ac948
commit
30263ad37d
@ -93,10 +93,6 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
|
|||||||
// are added in batchTimeout period, the batch will be returned. This function
|
// are added in batchTimeout period, the batch will be returned. This function
|
||||||
// will block until new chunks are received from localstore pull subscription.
|
// will block until new chunks are received from localstore pull subscription.
|
||||||
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
|
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
|
||||||
//TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer?
|
|
||||||
if from > 0 {
|
|
||||||
from--
|
|
||||||
}
|
|
||||||
batchStart := time.Now()
|
batchStart := time.Now()
|
||||||
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
|
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
@ -31,9 +31,9 @@ import (
|
|||||||
|
|
||||||
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
|
// SubscribePull returns a channel that provides chunk addresses and stored times from pull syncing index.
|
||||||
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
|
// Pull syncing index can be only subscribed to a particular proximity order bin. If since
|
||||||
// is not 0, the iteration will start from the first item stored after that id. If until is not 0,
|
// is not 0, the iteration will start from the since item (the item with binID == since). If until is not 0,
|
||||||
// only chunks stored up to this id will be sent to the channel, and the returned channel will be
|
// only chunks stored up to this id will be sent to the channel, and the returned channel will be
|
||||||
// closed. The since-until interval is open on since side, and closed on until side: (since,until] <=> [since+1,until]. Returned stop
|
// closed. The since-until interval is closed on since side, and closed on until side: [since,until]. Returned stop
|
||||||
// function will terminate current and further iterations without errors, and also close the returned channel.
|
// function will terminate current and further iterations without errors, and also close the returned channel.
|
||||||
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
|
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
|
||||||
// is false.
|
// is false.
|
||||||
@ -135,7 +135,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
|
|||||||
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
|
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if count > 0 {
|
||||||
first = false
|
first = false
|
||||||
|
}
|
||||||
case <-stopChan:
|
case <-stopChan:
|
||||||
// terminate the subscription
|
// terminate the subscription
|
||||||
// on stop
|
// on stop
|
||||||
|
@ -28,6 +28,55 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestDB_SubscribePull_first is a regression test for the first=false (from-1) bug
|
||||||
|
// The bug was that `first=false` was not behind an if-condition `if count > 0`. This resulted in chunks being missed, when
|
||||||
|
// the subscription is established before the chunk is actually uploaded. For example if a subscription is established with since=49,
|
||||||
|
// which means that the `SubscribePull` method should return chunk with BinID=49 via the channel, and the chunk for BinID=49 is uploaded,
|
||||||
|
// after the subscription, then it would have been skipped, where the correct behaviour is to not skip it and return it via the channel.
|
||||||
|
func TestDB_SubscribePull_first(t *testing.T) {
|
||||||
|
db, cleanupFunc := newTestDB(t, nil)
|
||||||
|
defer cleanupFunc()
|
||||||
|
|
||||||
|
addrs := make(map[uint8][]chunk.Address)
|
||||||
|
var addrsMu sync.Mutex
|
||||||
|
var wantedChunksCount int
|
||||||
|
|
||||||
|
// prepopulate database with some chunks
|
||||||
|
// before the subscription
|
||||||
|
uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100)
|
||||||
|
|
||||||
|
// any bin should do the trick
|
||||||
|
bin := uint8(1)
|
||||||
|
|
||||||
|
chunksInGivenBin := uint64(len(addrs[bin]))
|
||||||
|
|
||||||
|
errc := make(chan error)
|
||||||
|
|
||||||
|
since := chunksInGivenBin + 1
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ch, stop := db.SubscribePull(context.TODO(), bin, since, 0)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
chnk := <-ch
|
||||||
|
|
||||||
|
if chnk.BinID != since {
|
||||||
|
errc <- fmt.Errorf("expected chunk.BinID to be %v , but got %v", since, chnk.BinID)
|
||||||
|
} else {
|
||||||
|
errc <- nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
uploadRandomChunksBin(t, db, addrs, &addrsMu, &wantedChunksCount, 100)
|
||||||
|
|
||||||
|
err := <-errc
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestDB_SubscribePull uploads some chunks before and after
|
// TestDB_SubscribePull uploads some chunks before and after
|
||||||
// pull syncing subscription is created and validates if
|
// pull syncing subscription is created and validates if
|
||||||
// all addresses are received in the right order
|
// all addresses are received in the right order
|
||||||
|
Loading…
Reference in New Issue
Block a user