swarm/storage: Batched database migration (#18113)

This commit is contained in:
lash 2018-11-15 14:57:03 +01:00 committed by Anton Evangelatov
parent 17d67c5834
commit a6942b9f25
2 changed files with 135 additions and 36 deletions

View File

@ -284,7 +284,7 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
return val return val
} }
func parseGCIdxKey(key []byte) (byte, []byte) { func parseIdxKey(key []byte) (byte, []byte) {
return key[0], key[1:] return key[0], key[1:]
} }
@ -589,7 +589,7 @@ func (s *LDBStore) CleanGCIndex() error {
it.Seek([]byte{keyGCIdx}) it.Seek([]byte{keyGCIdx})
var gcDeletes int var gcDeletes int
for it.Valid() { for it.Valid() {
rowType, _ := parseGCIdxKey(it.Key()) rowType, _ := parseIdxKey(it.Key())
if rowType != keyGCIdx { if rowType != keyGCIdx {
break break
} }
@ -601,47 +601,113 @@ func (s *LDBStore) CleanGCIndex() error {
if err := s.db.Write(&batch); err != nil { if err := s.db.Write(&batch); err != nil {
return err return err
} }
batch.Reset()
it.Seek([]byte{keyIndex})
var idx dpaDBIndex
var poPtrs [256]uint64
for it.Valid() {
rowType, chunkHash := parseGCIdxKey(it.Key())
if rowType != keyIndex {
break
}
err := decodeIndex(it.Value(), &idx)
if err != nil {
return fmt.Errorf("corrupt index: %v", err)
}
po := s.po(chunkHash)
// if we don't find the data key, remove the entry
dataKey := getDataKey(idx.Idx, po)
_, err = s.db.Get(dataKey)
if err != nil {
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
batch.Delete(it.Key())
} else {
gcIdxKey := getGCIdxKey(&idx)
gcIdxData := getGCIdxValue(&idx, po, chunkHash)
batch.Put(gcIdxKey, gcIdxData)
log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
okEntryCount++
if idx.Idx > poPtrs[po] {
poPtrs[po] = idx.Idx
}
}
totalEntryCount++
it.Next()
}
it.Release() it.Release()
// corrected po index pointer values
var poPtrs [256]uint64
// set to true if chunk count not on 4096 iteration boundary
var doneIterating bool
// last key index in previous iteration
lastIdxKey := []byte{keyIndex}
// counter for debug output
var cleanBatchCount int
// go through all key index entries
for !doneIterating {
cleanBatchCount++
var idxs []dpaDBIndex
var chunkHashes [][]byte
var pos []uint8
it := s.db.NewIterator()
it.Seek(lastIdxKey)
// 4096 is just a nice number, don't look for any hidden meaning here...
var i int
for i = 0; i < 4096; i++ {
// this really shouldn't happen unless database is empty
// but let's keep it to be safe
if !it.Valid() {
doneIterating = true
break
}
// if it's not keyindex anymore we're done iterating
rowType, chunkHash := parseIdxKey(it.Key())
if rowType != keyIndex {
doneIterating = true
break
}
// decode the retrieved index
var idx dpaDBIndex
err := decodeIndex(it.Value(), &idx)
if err != nil {
return fmt.Errorf("corrupt index: %v", err)
}
po := s.po(chunkHash)
lastIdxKey = it.Key()
// if we don't find the data key, remove the entry
// if we find it, add to the array of new gc indices to create
dataKey := getDataKey(idx.Idx, po)
_, err = s.db.Get(dataKey)
if err != nil {
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
batch.Delete(it.Key())
} else {
idxs = append(idxs, idx)
chunkHashes = append(chunkHashes, chunkHash)
pos = append(pos, po)
okEntryCount++
if idx.Idx > poPtrs[po] {
poPtrs[po] = idx.Idx
}
}
totalEntryCount++
it.Next()
}
it.Release()
// flush the key index corrections
err := s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
// add correct gc indices
for i, okIdx := range idxs {
gcIdxKey := getGCIdxKey(&okIdx)
gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
batch.Put(gcIdxKey, gcIdxData)
log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
}
// flush them
err = s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
}
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len()) log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
// lastly add updated entry count
var entryCount [8]byte var entryCount [8]byte
binary.BigEndian.PutUint64(entryCount[:], okEntryCount) binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
batch.Put(keyEntryCnt, entryCount[:]) batch.Put(keyEntryCnt, entryCount[:])
// and add the new po index pointers
var poKey [2]byte var poKey [2]byte
poKey[0] = keyDistanceCnt poKey[0] = keyDistanceCnt
for i, poPtr := range poPtrs { for i, poPtr := range poPtrs {
@ -655,6 +721,7 @@ func (s *LDBStore) CleanGCIndex() error {
} }
} }
// if you made it this far your harddisk has survived. Congratulations
return s.db.Write(&batch) return s.db.Write(&batch)
} }

View File

@ -761,6 +761,38 @@ func TestCleanIndex(t *testing.T) {
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal) t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
} }
} }
// check that the iterator quits properly
chunks, err = mputRandomChunks(ldb, 4100, 4096)
if err != nil {
t.Fatal(err)
}
po = ldb.po(chunks[4099].Address()[:])
dataKey = make([]byte, 10)
dataKey[0] = keyData
dataKey[1] = byte(po)
binary.BigEndian.PutUint64(dataKey[2:], 4099+3)
if _, err := ldb.db.Get(dataKey); err != nil {
t.Fatal(err)
}
if err := ldb.db.Delete(dataKey); err != nil {
t.Fatal(err)
}
if err := ldb.CleanGCIndex(); err != nil {
t.Fatal(err)
}
// entrycount should now be one less of added chunks
c, err = ldb.db.Get(keyEntryCnt)
if err != nil {
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
}
entryCount = binary.BigEndian.Uint64(c)
if entryCount != 4099+2 {
t.Fatalf("expected entrycnt to be 2, was %d", c)
}
} }
func waitGc(ctx context.Context, ldb *LDBStore) { func waitGc(ctx context.Context, ldb *LDBStore) {