diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go index 7718d1e58..ebaba2d8f 100644 --- a/swarm/storage/localstore/gc.go +++ b/swarm/storage/localstore/gc.go @@ -117,6 +117,8 @@ var ( // run. GC run iterates on gcIndex and removes older items // form retrieval and other indexes. func (db *DB) collectGarbageWorker() { + defer close(db.collectGarbageWorkerDone) + for { select { case <-db.collectGarbageTrigger: @@ -132,7 +134,7 @@ func (db *DB) collectGarbageWorker() { db.triggerGarbageCollection() } - if testHookCollectGarbage != nil { + if collectedCount > 0 && testHookCollectGarbage != nil { testHookCollectGarbage(collectedCount) } case <-db.close: @@ -243,6 +245,8 @@ func (db *DB) triggerGarbageCollection() { // writeGCSizeDelay duration to avoid very frequent // database operations. func (db *DB) writeGCSizeWorker() { + defer close(db.writeGCSizeWorkerDone) + for { select { case <-db.writeGCSizeTrigger: diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go index 322b84665..60309d7fa 100644 --- a/swarm/storage/localstore/gc_test.go +++ b/swarm/storage/localstore/gc_test.go @@ -118,15 +118,6 @@ func testDB_collectGarbageWorker(t *testing.T) { t.Fatal(err) } }) - - // cleanup: drain the last testHookCollectGarbageChan - // element before calling deferred functions not to block - // collectGarbageWorker loop, preventing the race in - // setting testHookCollectGarbage function - select { - case <-testHookCollectGarbageChan: - default: - } } // TestDB_collectGarbageWorker_withRequests is a helper test function @@ -290,6 +281,7 @@ func TestDB_gcSize(t *testing.T) { if err != nil { t.Fatal(err) } + defer db.Close() t.Run("gc index size", newIndexGCSizeTest(db)) diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go index 7a9fb54f5..f92a9c1f2 100644 --- a/swarm/storage/localstore/localstore.go +++ b/swarm/storage/localstore/localstore.go @@ -107,6 +107,12 @@ type DB struct { // this channel is closed when close function is called // to terminate other goroutines close chan struct{} + + // protect Close method from exiting before + // garbage collection and gc size write workers + // are done + collectGarbageWorkerDone chan struct{} + writeGCSizeWorkerDone chan struct{} } // Options struct holds optional parameters for configuring DB. @@ -138,9 +144,11 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { // need to be buffered with the size of 1 // to signal another event if it // is triggered during already running function - collectGarbageTrigger: make(chan struct{}, 1), - writeGCSizeTrigger: make(chan struct{}, 1), - close: make(chan struct{}), + collectGarbageTrigger: make(chan struct{}, 1), + writeGCSizeTrigger: make(chan struct{}, 1), + close: make(chan struct{}), + collectGarbageWorkerDone: make(chan struct{}), + writeGCSizeWorkerDone: make(chan struct{}), } if db.capacity <= 0 { db.capacity = defaultCapacity @@ -361,6 +369,21 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { func (db *DB) Close() (err error) { close(db.close) db.updateGCWG.Wait() + + // wait for gc worker and gc size write workers to + // return before closing the shed + timeout := time.After(5 * time.Second) + select { + case <-db.collectGarbageWorkerDone: + case <-timeout: + log.Error("localstore: collect garbage worker did not return after db close") + } + select { + case <-db.writeGCSizeWorkerDone: + case <-timeout: + log.Error("localstore: write gc size worker did not return after db close") + } + if err := db.writeGCSize(db.getGCSize()); err != nil { log.Error("localstore: write gc size", "err", err) } diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go index c7309d3cd..6b48d54e9 100644 --- a/swarm/storage/localstore/localstore_test.go +++ b/swarm/storage/localstore/localstore_test.go @@ -163,6 +163,7 @@ func BenchmarkNew(b *testing.B) { if err != nil { b.Fatal(err) } + defer db.Close() uploader := db.NewPutter(ModePutUpload) syncer := db.NewSetter(ModeSetSync) for i := 0; i < count; i++ { diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index 16fe8b0dd..9800329ea 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -20,13 +20,11 @@ import ( "bytes" "context" "fmt" - "os" "sync" "testing" "time" "github.com/ethereum/go-ethereum/swarm/storage" - "github.com/ethereum/go-ethereum/swarm/testutil" ) // TestDB_SubscribePull uploads some chunks before and after @@ -34,12 +32,6 @@ import ( // all addresses are received in the right order // for expected proximity order bins. func TestDB_SubscribePull(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("does not complete with -race on Travis") - // Note: related ticket TODO - } - db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() @@ -87,12 +79,6 @@ func TestDB_SubscribePull(t *testing.T) { // validates if all addresses are received in the right order // for expected proximity order bins. func TestDB_SubscribePull_multiple(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("does not complete with -race on Travis") - // Note: related ticket TODO - } - db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() @@ -146,12 +132,6 @@ func TestDB_SubscribePull_multiple(t *testing.T) { // and validates if all expected addresses are received in the // right order for expected proximity order bins. func TestDB_SubscribePull_since(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("does not complete with -race on Travis") - // Note: related ticket TODO - } - db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() @@ -171,6 +151,9 @@ func TestDB_SubscribePull_since(t *testing.T) { })() uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + addrsMu.Lock() + defer addrsMu.Unlock() + last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -182,7 +165,6 @@ func TestDB_SubscribePull_since(t *testing.T) { bin := db.po(chunk.Address()) - addrsMu.Lock() if _, ok := addrs[bin]; !ok { addrs[bin] = make([]storage.Address, 0) } @@ -190,7 +172,6 @@ func TestDB_SubscribePull_since(t *testing.T) { addrs[bin] = append(addrs[bin], chunk.Address()) wantedChunksCount++ } - addrsMu.Unlock() lastTimestampMu.RLock() storeTimestamp := lastTimestamp @@ -242,12 +223,6 @@ func TestDB_SubscribePull_since(t *testing.T) { // and validates if all expected addresses are received in the // right order for expected proximity order bins. func TestDB_SubscribePull_until(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("does not complete with -race on Travis") - // Note: related ticket TODO - } - db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() @@ -267,6 +242,9 @@ func TestDB_SubscribePull_until(t *testing.T) { })() uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + addrsMu.Lock() + defer addrsMu.Unlock() + last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -278,7 +256,6 @@ func TestDB_SubscribePull_until(t *testing.T) { bin := db.po(chunk.Address()) - addrsMu.Lock() if _, ok := addrs[bin]; !ok { addrs[bin] = make([]storage.Address, 0) } @@ -286,7 +263,6 @@ func TestDB_SubscribePull_until(t *testing.T) { addrs[bin] = append(addrs[bin], chunk.Address()) wantedChunksCount++ } - addrsMu.Unlock() lastTimestampMu.RLock() storeTimestamp := lastTimestamp @@ -337,12 +313,6 @@ func TestDB_SubscribePull_until(t *testing.T) { // and until arguments, and validates if all expected addresses // are received in the right order for expected proximity order bins. func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { - - if testutil.RaceEnabled && os.Getenv("TRAVIS") == "true" { - t.Skip("does not complete with -race on Travis") - // Note: related ticket TODO - } - db, cleanupFunc := newTestDB(t, nil) defer cleanupFunc() @@ -362,6 +332,9 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { })() uploadRandomChunks := func(count int, wanted bool) (last map[uint8]ChunkDescriptor) { + addrsMu.Lock() + defer addrsMu.Unlock() + last = make(map[uint8]ChunkDescriptor) for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -373,7 +346,6 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { bin := db.po(chunk.Address()) - addrsMu.Lock() if _, ok := addrs[bin]; !ok { addrs[bin] = make([]storage.Address, 0) } @@ -381,7 +353,6 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { addrs[bin] = append(addrs[bin], chunk.Address()) wantedChunksCount++ } - addrsMu.Unlock() lastTimestampMu.RLock() storeTimestamp := lastTimestamp @@ -442,6 +413,9 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { // uploadRandomChunksBin uploads random chunks to database and adds them to // the map of addresses ber bin. func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uint8][]storage.Address, addrsMu *sync.Mutex, wantedChunksCount *int, count int) { + addrsMu.Lock() + defer addrsMu.Unlock() + for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -450,13 +424,11 @@ func uploadRandomChunksBin(t *testing.T, db *DB, uploader *Putter, addrs map[uin t.Fatal(err) } - addrsMu.Lock() bin := db.po(chunk.Address()) if _, ok := addrs[bin]; !ok { addrs[bin] = make([]storage.Address, 0) } addrs[bin] = append(addrs[bin], chunk.Address()) - addrsMu.Unlock() *wantedChunksCount++ } @@ -473,19 +445,24 @@ func readPullSubscriptionBin(ctx context.Context, bin uint8, ch <-chan ChunkDesc if !ok { return } + var err error addrsMu.Lock() if i+1 > len(addrs[bin]) { - errChan <- fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin) + err = fmt.Errorf("got more chunk addresses %v, then expected %v, for bin %v", i+1, len(addrs[bin]), bin) + } else { + want := addrs[bin][i] + if !bytes.Equal(got.Address, want) { + err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want) + } } - want := addrs[bin][i] addrsMu.Unlock() - var err error - if !bytes.Equal(got.Address, want) { - err = fmt.Errorf("got chunk address %v in bin %v %s, want %s", i, bin, got.Address.Hex(), want) - } i++ // send one and only one error per received address - errChan <- err + select { + case errChan <- err: + case <-ctx.Done(): + return + } case <-ctx.Done(): return } diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go index 73e7c25f7..0c8d7d0b9 100644 --- a/swarm/storage/localstore/subscription_push_test.go +++ b/swarm/storage/localstore/subscription_push_test.go @@ -40,6 +40,9 @@ func TestDB_SubscribePush(t *testing.T) { var chunksMu sync.Mutex uploadRandomChunks := func(count int) { + chunksMu.Lock() + defer chunksMu.Unlock() + for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -48,9 +51,7 @@ func TestDB_SubscribePush(t *testing.T) { t.Fatal(err) } - chunksMu.Lock() chunks = append(chunks, chunk) - chunksMu.Unlock() } } @@ -90,7 +91,11 @@ func TestDB_SubscribePush(t *testing.T) { } i++ // send one and only one error per received address - errChan <- err + select { + case errChan <- err: + case <-ctx.Done(): + return + } case <-ctx.Done(): return } @@ -123,6 +128,9 @@ func TestDB_SubscribePush_multiple(t *testing.T) { var addrsMu sync.Mutex uploadRandomChunks := func(count int) { + addrsMu.Lock() + defer addrsMu.Unlock() + for i := 0; i < count; i++ { chunk := generateRandomChunk() @@ -131,9 +139,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) { t.Fatal(err) } - addrsMu.Lock() addrs = append(addrs, chunk.Address()) - addrsMu.Unlock() } } @@ -175,7 +181,11 @@ func TestDB_SubscribePush_multiple(t *testing.T) { } i++ // send one and only one error per received address - errChan <- err + select { + case errChan <- err: + case <-ctx.Done(): + return + } case <-ctx.Done(): return }