save markSetSize

This commit is contained in:
vyzo 2021-03-05 10:00:17 +02:00
parent aff0f1ed4c
commit 17be7d3919

View File

@ -59,6 +59,10 @@ var (
// all active blocks into the hotstore. // all active blocks into the hotstore.
warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch") warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch")
// markSetSizeKey stores the current estimate for the mark set size.
// this is first computed at warmup and updated in every compaction
markSetSizeKey = dstore.NewKey("/splitstore/markSetSize")
log = logging.Logger("splitstore") log = logging.Logger("splitstore")
) )
@ -356,11 +360,22 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
s.warmupEpoch = bytesToEpoch(bs) s.warmupEpoch = bytesToEpoch(bs)
case dstore.ErrNotFound: case dstore.ErrNotFound:
default: default:
return xerrors.Errorf("error loading warmup epoch: %w", err) return xerrors.Errorf("error loading warmup epoch: %w", err)
} }
// load markSetSize from metadata ds
// if none, the splitstore will compute it during warmup and update in every compaction
bs, err = s.ds.Get(markSetSizeKey)
switch err {
case nil:
s.markSetSize = bytesToInt64(bs)
case dstore.ErrNotFound:
default:
return xerrors.Errorf("error loading mark set size: %w", err)
}
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch) log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
// watch the chain // watch the chain
@ -493,7 +508,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
} }
if count > s.markSetSize { if count > s.markSetSize {
s.markSetSize = count s.markSetSize = count + count>>2 // overestimate a bit
} }
// save the warmup epoch // save the warmup epoch
@ -502,6 +517,11 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
if err != nil { if err != nil {
log.Errorf("error saving warmup epoch: %s", err) log.Errorf("error saving warmup epoch: %s", err)
} }
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Errorf("error saving mark set size: %s", err)
}
} }
// Compaction/GC Algorithm // Compaction/GC Algorithm
@ -535,7 +555,7 @@ func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) {
panic(err) panic(err)
} }
s.markSetSize = count + count>>2 s.markSetSize = count + count>>2 // overestimate a bit
} }
func (s *SplitStore) compactSimple(curTs *types.TipSet) { func (s *SplitStore) compactSimple(curTs *types.TipSet) {
@ -575,7 +595,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
} }
if count > s.markSetSize { if count > s.markSetSize {
s.markSetSize = count + count>>2 s.markSetSize = count + count>>2 // overestimate a bit
} }
log.Infow("marking done", "took", time.Since(startMark)) log.Infow("marking done", "took", time.Since(startMark))
@ -669,6 +689,11 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
// TODO do something better here // TODO do something better here
panic(err) panic(err)
} }
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Errorf("error saving mark set size: %s", err)
}
} }
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
@ -790,7 +815,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
} }
if count > s.markSetSize { if count > s.markSetSize {
s.markSetSize = count + count>>2 s.markSetSize = count + count>>2 // overestimate a bit
} }
// Phase 1b: mark all reachable CIDs in the cold range // Phase 1b: mark all reachable CIDs in the cold range
@ -813,7 +838,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
} }
if count > s.markSetSize { if count > s.markSetSize {
s.markSetSize = count + count>>2 s.markSetSize = count + count>>2 // overestimate a bit
} }
log.Infow("marking done", "took", time.Since(startMark)) log.Infow("marking done", "took", time.Since(startMark))
@ -958,6 +983,11 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
// TODO do something better here // TODO do something better here
panic(err) panic(err)
} }
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Errorf("error saving mark set size: %s", err)
}
} }
func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error { func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
@ -967,15 +997,28 @@ func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
} }
func epochToBytes(epoch abi.ChainEpoch) []byte { func epochToBytes(epoch abi.ChainEpoch) []byte {
buf := make([]byte, 16) return uint64ToBytes(uint64(epoch))
n := binary.PutUvarint(buf, uint64(epoch))
return buf[:n]
} }
func bytesToEpoch(buf []byte) abi.ChainEpoch { func bytesToEpoch(buf []byte) abi.ChainEpoch {
epoch, n := binary.Uvarint(buf) return abi.ChainEpoch(bytesToUint64(buf))
if n < 0 { }
panic("bogus base epoch bytes")
} func int64ToBytes(i int64) []byte {
return abi.ChainEpoch(epoch) return uint64ToBytes(uint64(i))
}
func bytesToInt64(buf []byte) int64 {
return int64(bytesToUint64(buf))
}
func uint64ToBytes(i uint64) []byte {
buf := make([]byte, 16)
n := binary.PutUvarint(buf, i)
return buf[:n]
}
func bytesToUint64(buf []byte) uint64 {
i, _ := binary.Uvarint(buf)
return i
} }