forked from cerc-io/plugeth
swarm/storage: fix garbage collector index skew (#18080)
On file access LDBStore's tryAccessIdx() function created a faulty GC Index Data entry, because not indexing the ikey correctly. That caused the chunk addresses/hashes to start with '00' and the last two digits were dropped. => Incorrect chunk address. Besides the fix, the commit also contains a schema change which will run the CleanGCIndex() function to clean the GC index from erroneous entries. Note: CleanGCIndex() rebuilds the index from scratch which can take a really-really long time with a huge DB (possibly an hour).
This commit is contained in:
parent
4fecc7a3b1
commit
c41e1bd1eb
@ -57,7 +57,6 @@ var (
|
||||
|
||||
var (
|
||||
keyIndex = byte(0)
|
||||
keyOldData = byte(1)
|
||||
keyAccessCnt = []byte{2}
|
||||
keyEntryCnt = []byte{3}
|
||||
keyDataIdx = []byte{4}
|
||||
@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
|
||||
return val
|
||||
}
|
||||
|
||||
func parseGCIdxKey(key []byte) (byte, []byte) {
|
||||
return key[0], key[1:]
|
||||
}
|
||||
|
||||
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
|
||||
index = &dpaDBIndex{
|
||||
Idx: binary.BigEndian.Uint64(val[1:]),
|
||||
@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
|
||||
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
|
||||
}
|
||||
|
||||
func (s *LDBStore) ReIndex() {
|
||||
//Iterates over the database and checks that there are no faulty chunks
|
||||
// CleanGCIndex rebuilds the garbage collector index from scratch, while
|
||||
// removing inconsistent elements, e.g., indices with missing data chunks.
|
||||
// WARN: it's a pretty heavy, long running function.
|
||||
func (s *LDBStore) CleanGCIndex() error {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
batch := leveldb.Batch{}
|
||||
|
||||
var okEntryCount uint64
|
||||
var totalEntryCount uint64
|
||||
|
||||
// throw out all gc indices, we will rebuild from cleaned index
|
||||
it := s.db.NewIterator()
|
||||
startPosition := []byte{keyOldData}
|
||||
it.Seek(startPosition)
|
||||
var key []byte
|
||||
var errorsFound, total int
|
||||
it.Seek([]byte{keyGCIdx})
|
||||
var gcDeletes int
|
||||
for it.Valid() {
|
||||
key = it.Key()
|
||||
if (key == nil) || (key[0] != keyOldData) {
|
||||
rowType, _ := parseGCIdxKey(it.Key())
|
||||
if rowType != keyGCIdx {
|
||||
break
|
||||
}
|
||||
data := it.Value()
|
||||
hasher := s.hashfunc()
|
||||
hasher.Write(data)
|
||||
hash := hasher.Sum(nil)
|
||||
|
||||
newKey := make([]byte, 10)
|
||||
oldCntKey := make([]byte, 2)
|
||||
newCntKey := make([]byte, 2)
|
||||
oldCntKey[0] = keyDistanceCnt
|
||||
newCntKey[0] = keyDistanceCnt
|
||||
key[0] = keyData
|
||||
key[1] = s.po(Address(key[1:]))
|
||||
oldCntKey[1] = key[1]
|
||||
newCntKey[1] = s.po(Address(newKey[1:]))
|
||||
copy(newKey[2:], key[1:])
|
||||
newValue := append(hash, data...)
|
||||
|
||||
batch := new(leveldb.Batch)
|
||||
batch.Delete(key)
|
||||
s.bucketCnt[oldCntKey[1]]--
|
||||
batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
|
||||
batch.Put(newKey, newValue)
|
||||
s.bucketCnt[newCntKey[1]]++
|
||||
batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
|
||||
s.db.Write(batch)
|
||||
batch.Delete(it.Key())
|
||||
gcDeletes++
|
||||
it.Next()
|
||||
}
|
||||
log.Debug("gc", "deletes", gcDeletes)
|
||||
if err := s.db.Write(&batch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
it.Seek([]byte{keyIndex})
|
||||
var idx dpaDBIndex
|
||||
var poPtrs [256]uint64
|
||||
for it.Valid() {
|
||||
rowType, chunkHash := parseGCIdxKey(it.Key())
|
||||
if rowType != keyIndex {
|
||||
break
|
||||
}
|
||||
err := decodeIndex(it.Value(), &idx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("corrupt index: %v", err)
|
||||
}
|
||||
po := s.po(chunkHash)
|
||||
|
||||
// if we don't find the data key, remove the entry
|
||||
dataKey := getDataKey(idx.Idx, po)
|
||||
_, err = s.db.Get(dataKey)
|
||||
if err != nil {
|
||||
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
|
||||
batch.Delete(it.Key())
|
||||
} else {
|
||||
gcIdxKey := getGCIdxKey(&idx)
|
||||
gcIdxData := getGCIdxValue(&idx, po, chunkHash)
|
||||
batch.Put(gcIdxKey, gcIdxData)
|
||||
log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
|
||||
okEntryCount++
|
||||
if idx.Idx > poPtrs[po] {
|
||||
poPtrs[po] = idx.Idx
|
||||
}
|
||||
}
|
||||
totalEntryCount++
|
||||
it.Next()
|
||||
}
|
||||
|
||||
it.Release()
|
||||
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
|
||||
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
|
||||
|
||||
var entryCount [8]byte
|
||||
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
|
||||
batch.Put(keyEntryCnt, entryCount[:])
|
||||
var poKey [2]byte
|
||||
poKey[0] = keyDistanceCnt
|
||||
for i, poPtr := range poPtrs {
|
||||
poKey[1] = uint8(i)
|
||||
if poPtr == 0 {
|
||||
batch.Delete(poKey[:])
|
||||
} else {
|
||||
var idxCount [8]byte
|
||||
binary.BigEndian.PutUint64(idxCount[:], poPtr)
|
||||
batch.Put(poKey[:], idxCount[:])
|
||||
}
|
||||
}
|
||||
|
||||
return s.db.Write(&batch)
|
||||
}
|
||||
|
||||
// Delete is removes a chunk and updates indices.
|
||||
@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
|
||||
s.accessCnt++
|
||||
s.batch.Put(ikey, idata)
|
||||
newGCIdxKey := getGCIdxKey(index)
|
||||
newGCIdxData := getGCIdxValue(index, po, ikey)
|
||||
newGCIdxData := getGCIdxValue(index, po, ikey[1:])
|
||||
s.batch.Delete(oldGCIdxKey)
|
||||
s.batch.Put(newGCIdxKey, newGCIdxData)
|
||||
select {
|
||||
@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) {
|
||||
data, err := s.db.Get(keySchema)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return "", nil
|
||||
return DbSchemaNone, nil
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package storage
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -623,6 +624,145 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
|
||||
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
|
||||
}
|
||||
|
||||
func TestCleanIndex(t *testing.T) {
|
||||
capacity := 5000
|
||||
n := 3
|
||||
|
||||
ldb, cleanup := newLDBStore(t)
|
||||
ldb.setCapacity(uint64(capacity))
|
||||
defer cleanup()
|
||||
|
||||
chunks, err := mputRandomChunks(ldb, n, 4096)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// remove the data of the first chunk
|
||||
po := ldb.po(chunks[0].Address()[:])
|
||||
dataKey := make([]byte, 10)
|
||||
dataKey[0] = keyData
|
||||
dataKey[1] = byte(po)
|
||||
// dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
|
||||
if _, err := ldb.db.Get(dataKey); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ldb.db.Delete(dataKey); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// remove the gc index row for the first chunk
|
||||
gcFirstCorrectKey := make([]byte, 9)
|
||||
gcFirstCorrectKey[0] = keyGCIdx
|
||||
if err := ldb.db.Delete(gcFirstCorrectKey); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// warp the gc data of the second chunk
|
||||
// this data should be correct again after the clean
|
||||
gcSecondCorrectKey := make([]byte, 9)
|
||||
gcSecondCorrectKey[0] = keyGCIdx
|
||||
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1))
|
||||
gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1)
|
||||
copy(warpedGCVal[1:], gcSecondCorrectVal)
|
||||
if err := ldb.db.Delete(gcSecondCorrectKey); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := ldb.CleanGCIndex(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// the index without corresponding data should have been deleted
|
||||
idxKey := make([]byte, 33)
|
||||
idxKey[0] = keyIndex
|
||||
copy(idxKey[1:], chunks[0].Address())
|
||||
if _, err := ldb.db.Get(idxKey); err == nil {
|
||||
t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey)
|
||||
}
|
||||
|
||||
// the two other indices should be present
|
||||
copy(idxKey[1:], chunks[1].Address())
|
||||
if _, err := ldb.db.Get(idxKey); err != nil {
|
||||
t.Fatalf("expected chunk 1 idx to be present: %v", idxKey)
|
||||
}
|
||||
|
||||
copy(idxKey[1:], chunks[2].Address())
|
||||
if _, err := ldb.db.Get(idxKey); err != nil {
|
||||
t.Fatalf("expected chunk 2 idx to be present: %v", idxKey)
|
||||
}
|
||||
|
||||
// first gc index should still be gone
|
||||
if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil {
|
||||
t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey)
|
||||
}
|
||||
|
||||
// second gc index should still be fixed
|
||||
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
|
||||
t.Fatalf("expected gc 1 idx to be present: %v", idxKey)
|
||||
}
|
||||
|
||||
// third gc index should be unchanged
|
||||
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2))
|
||||
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
|
||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
|
||||
}
|
||||
|
||||
c, err := ldb.db.Get(keyEntryCnt)
|
||||
if err != nil {
|
||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
|
||||
}
|
||||
|
||||
// entrycount should now be one less
|
||||
entryCount := binary.BigEndian.Uint64(c)
|
||||
if entryCount != 2 {
|
||||
t.Fatalf("expected entrycnt to be 2, was %d", c)
|
||||
}
|
||||
|
||||
// the chunks might accidentally be in the same bin
|
||||
// if so that bin counter will now be 2 - the highest added index.
|
||||
// if not, the total of them will be 3
|
||||
poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())}
|
||||
if poBins[0] == poBins[1] {
|
||||
poBins = poBins[:1]
|
||||
}
|
||||
|
||||
var binTotal uint64
|
||||
var currentBin [2]byte
|
||||
currentBin[0] = keyDistanceCnt
|
||||
if len(poBins) == 1 {
|
||||
currentBin[1] = poBins[0]
|
||||
c, err := ldb.db.Get(currentBin[:])
|
||||
if err != nil {
|
||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
|
||||
}
|
||||
binCount := binary.BigEndian.Uint64(c)
|
||||
if binCount != 2 {
|
||||
t.Fatalf("expected entrycnt to be 2, was %d", binCount)
|
||||
}
|
||||
} else {
|
||||
for _, bin := range poBins {
|
||||
currentBin[1] = bin
|
||||
c, err := ldb.db.Get(currentBin[:])
|
||||
if err != nil {
|
||||
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
|
||||
}
|
||||
binCount := binary.BigEndian.Uint64(c)
|
||||
binTotal += binCount
|
||||
|
||||
}
|
||||
if binTotal != 3 {
|
||||
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitGc(ctx context.Context, ldb *LDBStore) {
|
||||
<-ldb.gc.runC
|
||||
ldb.gc.runC <- struct{}{}
|
||||
|
@ -196,31 +196,48 @@ func (ls *LocalStore) Close() {
|
||||
|
||||
// Migrate checks the datastore schema vs the runtime schema, and runs migrations if they don't match
|
||||
func (ls *LocalStore) Migrate() error {
|
||||
schema, err := ls.DbStore.GetSchema()
|
||||
actualDbSchema, err := ls.DbStore.GetSchema()
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("found schema", "schema", schema, "runtime-schema", CurrentDbSchema)
|
||||
if schema != CurrentDbSchema {
|
||||
// run migrations
|
||||
log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)
|
||||
|
||||
if schema == "" {
|
||||
log.Debug("running migrations for", "schema", schema, "runtime-schema", CurrentDbSchema)
|
||||
if actualDbSchema == CurrentDbSchema {
|
||||
return nil
|
||||
}
|
||||
|
||||
// delete chunks that are not valid, i.e. chunks that do not pass any of the ls.Validators
|
||||
if actualDbSchema == DbSchemaNone {
|
||||
ls.migrateFromNoneToPurity()
|
||||
actualDbSchema = DbSchemaPurity
|
||||
}
|
||||
|
||||
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if actualDbSchema == DbSchemaPurity {
|
||||
if err := ls.migrateFromPurityToHalloween(); err != nil {
|
||||
return err
|
||||
}
|
||||
actualDbSchema = DbSchemaHalloween
|
||||
}
|
||||
|
||||
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *LocalStore) migrateFromNoneToPurity() {
|
||||
// delete chunks that are not valid, i.e. chunks that do not pass
|
||||
// any of the ls.Validators
|
||||
ls.DbStore.Cleanup(func(c *chunk) bool {
|
||||
return !ls.isValid(c)
|
||||
})
|
||||
|
||||
err := ls.DbStore.PutSchema(DbSchemaPurity)
|
||||
if err != nil {
|
||||
log.Error(err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
func (ls *LocalStore) migrateFromPurityToHalloween() error {
|
||||
return ls.DbStore.CleanGCIndex()
|
||||
}
|
||||
|
@ -1,6 +1,17 @@
|
||||
package storage
|
||||
|
||||
// The DB schema we want to use. The actual/current DB schema might differ
|
||||
// until migrations are run.
|
||||
const CurrentDbSchema = DbSchemaHalloween
|
||||
|
||||
// There was a time when we had no schema at all.
|
||||
const DbSchemaNone = ""
|
||||
|
||||
// "purity" is the first formal schema of LevelDB we release together with Swarm 0.3.5
|
||||
const DbSchemaPurity = "purity"
|
||||
|
||||
const CurrentDbSchema = DbSchemaPurity
|
||||
// "halloween" is here because we had a screw in the garbage collector index.
|
||||
// Because of that we had to rebuild the GC index to get rid of erroneous
|
||||
// entries and that takes a long time. This schema is used for bookkeeping,
|
||||
// so rebuild index will run just once.
|
||||
const DbSchemaHalloween = "halloween"
|
||||
|
Loading…
Reference in New Issue
Block a user