keep track of the compaction serial (index)
it is useful so that: - we only do slow (but very effective) moving gc every 10 compactions - we can detect a splitstore v0 upgrade and re-warm up
This commit is contained in:
parent
c93328b036
commit
818b8de182
@ -72,6 +72,9 @@ var (
|
|||||||
// this is first computed at warmup and updated in every compaction
|
// this is first computed at warmup and updated in every compaction
|
||||||
markSetSizeKey = dstore.NewKey("/splitstore/markSetSize")
|
markSetSizeKey = dstore.NewKey("/splitstore/markSetSize")
|
||||||
|
|
||||||
|
// compactionIndexKey stores the compaction index (serial number)
|
||||||
|
compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex")
|
||||||
|
|
||||||
log = logging.Logger("splitstore")
|
log = logging.Logger("splitstore")
|
||||||
|
|
||||||
// used to signal end of walk
|
// used to signal end of walk
|
||||||
@ -140,6 +143,8 @@ type SplitStore struct {
|
|||||||
markSetEnv MarkSetEnv
|
markSetEnv MarkSetEnv
|
||||||
markSetSize int64
|
markSetSize int64
|
||||||
|
|
||||||
|
compactionIndex int64
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
@ -530,6 +535,17 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
return xerrors.Errorf("error loading mark set size: %w", err)
|
return xerrors.Errorf("error loading mark set size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// load compactionIndex from metadata ds to provide a hint as to when to perform moving gc
|
||||||
|
bs, err = s.ds.Get(compactionIndexKey)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
s.compactionIndex = bytesToInt64(bs)
|
||||||
|
|
||||||
|
case dstore.ErrNotFound:
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("error loading compaction index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
|
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
|
||||||
|
|
||||||
// watch the chain
|
// watch the chain
|
||||||
@ -977,7 +993,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
currentEpoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
boundaryEpoch := currentEpoch - CompactionBoundary
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
|
||||||
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch)
|
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex)
|
||||||
|
|
||||||
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -994,7 +1010,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
s.beginTxnMarking(markSet)
|
s.beginTxnMarking(markSet)
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
|
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
|
||||||
// and messages until the boundary epoch.
|
// and messages until the boundary epoch.nn
|
||||||
log.Info("marking reachable objects")
|
log.Info("marking reachable objects")
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
@ -1147,6 +1163,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error saving mark set size: %w", err)
|
return xerrors.Errorf("error saving mark set size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.compactionIndex++
|
||||||
|
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error saving compaction index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (s *SplitStore) gcHotstore() {
|
func (s *SplitStore) gcHotstore() {
|
||||||
|
// we only perform moving gc every 10 compactions as it can take a while
|
||||||
|
if s.compactionIndex%10 != 0 {
|
||||||
|
goto online_gc
|
||||||
|
}
|
||||||
|
|
||||||
// check if the hotstore is movable; if so, move it.
|
// check if the hotstore is movable; if so, move it.
|
||||||
if mover, ok := s.hot.(bstore.BlockstoreMover); ok {
|
if mover, ok := s.hot.(bstore.BlockstoreMover); ok {
|
||||||
log.Info("moving hotstore")
|
log.Info("moving hotstore")
|
||||||
@ -14,13 +19,15 @@ func (s *SplitStore) gcHotstore() {
|
|||||||
err := mover.MoveTo("", nil)
|
err := mover.MoveTo("", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error moving hotstore: %s", err)
|
log.Warnf("error moving hotstore: %s", err)
|
||||||
return
|
// try online gc
|
||||||
|
goto online_gc
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("moving hotstore done", "took", time.Since(startMove))
|
log.Infow("moving hotstore done", "took", time.Since(startMove))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
online_gc:
|
||||||
// check if the hotstore supports online GC; if so, GC it.
|
// check if the hotstore supports online GC; if so, GC it.
|
||||||
if gc, ok := s.hot.(bstore.BlockstoreGC); ok {
|
if gc, ok := s.hot.(bstore.BlockstoreGC); ok {
|
||||||
log.Info("garbage collecting hotstore")
|
log.Info("garbage collecting hotstore")
|
||||||
|
Loading…
Reference in New Issue
Block a user