From a8aa89accb0bcd4f24cd430d8f100e0ff9e239d0 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 20 Aug 2018 14:10:30 +0200 Subject: [PATCH] swarm/storage: cleanup task - remove bigger chunks (#17424) --- swarm/storage/ldbstore.go | 67 ++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 22 deletions(-) diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 7920ee767..b95aa13b0 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" @@ -384,14 +385,13 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { } func (s *LDBStore) Cleanup() { - //Iterates over the database and checks that there are no faulty chunks + //Iterates over the database and checks that there are no chunks bigger than 4kb + var errorsFound, removed, total int + it := s.db.NewIterator() - startPosition := []byte{keyIndex} - it.Seek(startPosition) - var key []byte - var errorsFound, total int - for it.Valid() { - key = it.Key() + defer it.Release() + for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() { + key := it.Key() if (key == nil) || (key[0] != keyIndex) { break } @@ -399,27 +399,50 @@ func (s *LDBStore) Cleanup() { var index dpaDBIndex err := decodeIndex(it.Value(), &index) if err != nil { - it.Next() + log.Warn("Cannot decode") + errorsFound++ continue } - data, err := s.db.Get(getDataKey(index.Idx, s.po(Address(key[1:])))) + hash := key[1:] + po := s.po(hash) + datakey := getDataKey(index.Idx, po) + data, err := s.db.Get(datakey) if err != nil { - log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err)) - s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:]))) - errorsFound++ - } else { - hasher := s.hashfunc() - hasher.Write(data[32:]) - hash := hasher.Sum(nil) - if !bytes.Equal(hash, key[1:]) { - log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])) - s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:]))) + found := false + + // highest possible proximity is 255 + for po = 1; po <= 255; po++ { + datakey = getDataKey(index.Idx, po) + data, err = s.db.Get(datakey) + if err == nil { + found = true + break + } + } + + if !found { + log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key[:])) + errorsFound++ + continue } } - it.Next() + + c := &Chunk{} + ck := data[:32] + decodeData(data, c) + + cs := int64(binary.LittleEndian.Uint64(c.SData[:8])) + log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + + if len(c.SData) > chunk.DefaultSize+8 { + log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs) + s.delete(index.Idx, getIndexKey(key[1:]), po) + removed++ + errorsFound++ + } } - it.Release() - log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) + + log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed)) } func (s *LDBStore) ReIndex() {