swarm/storage: fix access count on dbstore after cache hit (#17978)
Access count was not incremented when chunk was retrieved from cache. So the garbage collector might have deleted the most frequently accessed chunk from disk. Co-authored-by: Ferenc Szabo <ferenc.szabo@ethereum.org>
This commit is contained in:
		
							parent
							
								
									1212c7b844
								
							
						
					
					
						commit
						8080265f3f
					
				| @ -186,6 +186,20 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { | ||||
| 	return s, nil | ||||
| } | ||||
| 
 | ||||
| // MarkAccessed increments the access counter as a best effort for a chunk, so
 | ||||
| // the chunk won't get garbage collected.
 | ||||
| func (s *LDBStore) MarkAccessed(addr Address) { | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
| 
 | ||||
| 	if s.closed { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	proximity := s.po(addr) | ||||
| 	s.tryAccessIdx(addr, proximity) | ||||
| } | ||||
| 
 | ||||
| // initialize and set values for processing of gc round
 | ||||
| func (s *LDBStore) startGC(c int) { | ||||
| 
 | ||||
| @ -349,6 +363,7 @@ func (s *LDBStore) collectGarbage() error { | ||||
| 			s.delete(s.gc.batch.Batch, index, keyIdx, po) | ||||
| 			singleIterationCount++ | ||||
| 			s.gc.count++ | ||||
| 			log.Trace("garbage collect enqueued chunk for deletion", "key", hash) | ||||
| 
 | ||||
| 			// break if target is not on max garbage batch boundary
 | ||||
| 			if s.gc.count >= s.gc.target { | ||||
| @ -685,12 +700,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error { | ||||
| 	idata, err := s.db.Get(ikey) | ||||
| 	if err != nil { | ||||
| 		s.doPut(chunk, &index, po) | ||||
| 	} else { | ||||
| 		log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po) | ||||
| 		decodeIndex(idata, &index) | ||||
| 	} | ||||
| 	index.Access = s.accessCnt | ||||
| 	s.accessCnt++ | ||||
| 	idata = encodeIndex(&index) | ||||
| 	s.batch.Put(ikey, idata) | ||||
| 
 | ||||
| @ -723,7 +733,8 @@ func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) { | ||||
| 	s.entryCnt++ | ||||
| 	dbEntryCount.Inc(1) | ||||
| 	s.dataIdx++ | ||||
| 
 | ||||
| 	index.Access = s.accessCnt | ||||
| 	s.accessCnt++ | ||||
| 	cntKey := make([]byte, 2) | ||||
| 	cntKey[0] = keyDistanceCnt | ||||
| 	cntKey[1] = po | ||||
| @ -796,18 +807,23 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // try to find index; if found, update access cnt and return true
 | ||||
| func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { | ||||
| // tryAccessIdx tries to find index entry. If found then increments the access
 | ||||
| // count for garbage collection and returns the index entry and true for found,
 | ||||
| // otherwise returns nil and false.
 | ||||
| func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) { | ||||
| 	ikey := getIndexKey(addr) | ||||
| 	idata, err := s.db.Get(ikey) | ||||
| 	if err != nil { | ||||
| 		return false | ||||
| 		return nil, false | ||||
| 	} | ||||
| 
 | ||||
| 	index := new(dpaDBIndex) | ||||
| 	decodeIndex(idata, index) | ||||
| 	oldGCIdxKey := getGCIdxKey(index) | ||||
| 	s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt)) | ||||
| 	s.accessCnt++ | ||||
| 	index.Access = s.accessCnt | ||||
| 	idata = encodeIndex(index) | ||||
| 	s.accessCnt++ | ||||
| 	s.batch.Put(ikey, idata) | ||||
| 	newGCIdxKey := getGCIdxKey(index) | ||||
| 	newGCIdxData := getGCIdxValue(index, po, ikey) | ||||
| @ -817,7 +833,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool { | ||||
| 	case s.batchesC <- struct{}{}: | ||||
| 	default: | ||||
| 	} | ||||
| 	return true | ||||
| 	return index, true | ||||
| } | ||||
| 
 | ||||
| // GetSchema is returning the current named schema of the datastore as read from LevelDB
 | ||||
| @ -858,12 +874,12 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) | ||||
| 
 | ||||
| // TODO: To conform with other private methods of this object indices should not be updated
 | ||||
| func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { | ||||
| 	var indx dpaDBIndex | ||||
| 	if s.closed { | ||||
| 		return nil, ErrDBClosed | ||||
| 	} | ||||
| 	proximity := s.po(addr) | ||||
| 	if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) { | ||||
| 	index, found := s.tryAccessIdx(addr, proximity) | ||||
| 	if found { | ||||
| 		var data []byte | ||||
| 		if s.getDataFunc != nil { | ||||
| 			// if getDataFunc is defined, use it to retrieve the chunk data
 | ||||
| @ -874,12 +890,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) { | ||||
| 			} | ||||
| 		} else { | ||||
| 			// default DbStore functionality to retrieve chunk data
 | ||||
| 			datakey := getDataKey(indx.Idx, proximity) | ||||
| 			datakey := getDataKey(index.Idx, proximity) | ||||
| 			data, err = s.db.Get(datakey) | ||||
| 			log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) | ||||
| 			log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity) | ||||
| 			if err != nil { | ||||
| 				log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err) | ||||
| 				s.deleteNow(&indx, getIndexKey(addr), s.po(addr)) | ||||
| 				s.deleteNow(index, getIndexKey(addr), s.po(addr)) | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| @ -31,7 +31,6 @@ import ( | ||||
| 	ch "github.com/ethereum/go-ethereum/swarm/chunk" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/log" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/storage/mock/mem" | ||||
| 
 | ||||
| 	ldberrors "github.com/syndtr/goleveldb/leveldb/errors" | ||||
| ) | ||||
| 
 | ||||
| @ -105,6 +104,46 @@ func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) { | ||||
| 	testStoreCorrect(db, n, chunksize, t) | ||||
| } | ||||
| 
 | ||||
| func TestMarkAccessed(t *testing.T) { | ||||
| 	db, cleanup, err := newTestDbStore(false, true) | ||||
| 	defer cleanup() | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("init dbStore failed: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	h := GenerateRandomChunk(ch.DefaultSize) | ||||
| 
 | ||||
| 	db.Put(context.Background(), h) | ||||
| 
 | ||||
| 	var index dpaDBIndex | ||||
| 	addr := h.Address() | ||||
| 	idxk := getIndexKey(addr) | ||||
| 
 | ||||
| 	idata, err := db.db.Get(idxk) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	decodeIndex(idata, &index) | ||||
| 
 | ||||
| 	if index.Access != 0 { | ||||
| 		t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access) | ||||
| 	} | ||||
| 
 | ||||
| 	db.MarkAccessed(addr) | ||||
| 	db.writeCurrentBatch() | ||||
| 
 | ||||
| 	idata, err = db.db.Get(idxk) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	decodeIndex(idata, &index) | ||||
| 
 | ||||
| 	if index.Access != 1 { | ||||
| 		t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func TestDbStoreRandom_1(t *testing.T) { | ||||
| 	testDbStoreRandom(1, 0, false, t) | ||||
| } | ||||
|  | ||||
| @ -153,6 +153,7 @@ func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err e | ||||
| 
 | ||||
| 	if err == nil { | ||||
| 		metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1) | ||||
| 		go ls.DbStore.MarkAccessed(addr) | ||||
| 		return chunk, nil | ||||
| 	} | ||||
| 
 | ||||
|  | ||||
| @ -21,6 +21,7 @@ import ( | ||||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| 
 | ||||
| 	ch "github.com/ethereum/go-ethereum/swarm/chunk" | ||||
| ) | ||||
| @ -144,3 +145,67 @@ func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs [] | ||||
| 	} | ||||
| 	return hs, errs | ||||
| } | ||||
| 
 | ||||
| // TestGetFrequentlyAccessedChunkWontGetGarbageCollected tests that the most
 | ||||
| // frequently accessed chunk is not garbage collected from LDBStore, i.e.,
 | ||||
| // from disk when we are at the capacity and garbage collector runs. For that
 | ||||
| // we start putting random chunks into the DB while continuously accessing the
 | ||||
| // chunk we care about then check if we can still retrieve it from disk.
 | ||||
| func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) { | ||||
| 	ldbCap := defaultGCRatio | ||||
| 	store, cleanup := setupLocalStore(t, ldbCap) | ||||
| 	defer cleanup() | ||||
| 
 | ||||
| 	var chunks []Chunk | ||||
| 	for i := 0; i < ldbCap; i++ { | ||||
| 		chunks = append(chunks, GenerateRandomChunk(ch.DefaultSize)) | ||||
| 	} | ||||
| 
 | ||||
| 	mostAccessed := chunks[0].Address() | ||||
| 	for _, chunk := range chunks { | ||||
| 		if err := store.Put(context.Background(), chunk); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 
 | ||||
| 		if _, err := store.Get(context.Background(), mostAccessed); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		// Add time for MarkAccessed() to be able to finish in a separate Goroutine
 | ||||
| 		time.Sleep(1 * time.Millisecond) | ||||
| 	} | ||||
| 
 | ||||
| 	store.DbStore.collectGarbage() | ||||
| 	if _, err := store.DbStore.Get(context.Background(), mostAccessed); err != nil { | ||||
| 		t.Logf("most frequntly accessed chunk not found on disk (key: %v)", mostAccessed) | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| func setupLocalStore(t *testing.T, ldbCap int) (ls *LocalStore, cleanup func()) { | ||||
| 	t.Helper() | ||||
| 
 | ||||
| 	var err error | ||||
| 	datadir, err := ioutil.TempDir("", "storage") | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	params := &LocalStoreParams{ | ||||
| 		StoreParams: NewStoreParams(uint64(ldbCap), uint(ldbCap), nil, nil), | ||||
| 	} | ||||
| 	params.Init(datadir) | ||||
| 
 | ||||
| 	store, err := NewLocalStore(params, nil) | ||||
| 	if err != nil { | ||||
| 		_ = os.RemoveAll(datadir) | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 
 | ||||
| 	cleanup = func() { | ||||
| 		store.Close() | ||||
| 		_ = os.RemoveAll(datadir) | ||||
| 	} | ||||
| 
 | ||||
| 	return store, cleanup | ||||
| } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user