always do full walks, not only when there is a sync gap

This commit is contained in:
vyzo 2021-06-21 14:53:56 +03:00
parent 30dbe4978b
commit 0390285c4e

View File

@ -69,11 +69,6 @@ var (
// all active blocks into the hotstore. // all active blocks into the hotstore.
warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch") warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch")
// syncGapEpochKey stores the last epoch where a sync gap was detected.
// If there is a sync gap after the boundary epoch, compaction will perform
// a slower full walk from the current epoch to the boundary epoch
syncGapEpochKey = dstore.NewKey("/splitstore/syncGapEpoch")
// markSetSizeKey stores the current estimate for the mark set size. // markSetSizeKey stores the current estimate for the mark set size.
// this is first computed at warmup and updated in every compaction // this is first computed at warmup and updated in every compaction
markSetSizeKey = dstore.NewKey("/splitstore/markSetSize") markSetSizeKey = dstore.NewKey("/splitstore/markSetSize")
@ -120,10 +115,9 @@ type SplitStore struct {
cfg *Config cfg *Config
baseEpoch abi.ChainEpoch baseEpoch abi.ChainEpoch
syncGapEpoch abi.ChainEpoch warmupEpoch abi.ChainEpoch
warmupEpoch abi.ChainEpoch writeEpoch abi.ChainEpoch
writeEpoch abi.ChainEpoch
coldPurgeSize int coldPurgeSize int
@ -382,17 +376,6 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return xerrors.Errorf("error loading warmup epoch: %w", err) return xerrors.Errorf("error loading warmup epoch: %w", err)
} }
// load sync gap epoch from metadata ds
bs, err = s.ds.Get(syncGapEpochKey)
switch err {
case nil:
s.syncGapEpoch = bytesToEpoch(bs)
case dstore.ErrNotFound:
default:
return xerrors.Errorf("error loading sync gap epoch: %w", err)
}
// load markSetSize from metadata ds // load markSetSize from metadata ds
// if none, the splitstore will compute it during warmup and update in every compaction // if none, the splitstore will compute it during warmup and update in every compaction
bs, err = s.ds.Get(markSetSizeKey) bs, err = s.ds.Get(markSetSizeKey)
@ -447,11 +430,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if time.Since(timestamp) > SyncGapTime { if time.Since(timestamp) > SyncGapTime {
err := s.setSyncGapEpoch(epoch)
if err != nil {
log.Warnf("error saving sync gap epoch: %s", err)
}
// don't attempt compaction before we have caught up syncing // don't attempt compaction before we have caught up syncing
return nil return nil
} }
@ -469,7 +447,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
log.Info("compacting splitstore") log.Info("compacting splitstore")
start := time.Now() start := time.Now()
s.compact(curTs, s.syncGapEpoch) s.compact(curTs)
log.Infow("compaction done", "took", time.Since(start)) log.Infow("compaction done", "took", time.Since(start))
}() }()
@ -697,7 +675,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
} }
// Compaction/GC Algorithm // Compaction/GC Algorithm
func (s *SplitStore) compact(curTs *types.TipSet, syncGapEpoch abi.ChainEpoch) { func (s *SplitStore) compact(curTs *types.TipSet) {
var err error var err error
if s.markSetSize == 0 { if s.markSetSize == 0 {
start := time.Now() start := time.Now()
@ -713,7 +691,7 @@ func (s *SplitStore) compact(curTs *types.TipSet, syncGapEpoch abi.ChainEpoch) {
} }
start := time.Now() start := time.Now()
err = s.doCompact(curTs, syncGapEpoch) err = s.doCompact(curTs)
took := time.Since(start).Milliseconds() took := time.Since(start).Milliseconds()
stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
@ -740,7 +718,7 @@ func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error {
return nil return nil
} }
func (s *SplitStore) doCompact(curTs *types.TipSet, syncGapEpoch abi.ChainEpoch) error { func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height() currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary boundaryEpoch := currentEpoch - CompactionBoundary
coldEpoch := boundaryEpoch - CompactionSlack coldEpoch := boundaryEpoch - CompactionSlack
@ -757,30 +735,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet, syncGapEpoch abi.ChainEpoch)
log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch) log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)
startMark := time.Now() startMark := time.Now()
var inclMsgs bool
var markTs *types.TipSet
if syncGapEpoch > boundaryEpoch {
// There is a sync gap that may have caused writes that are logically after the boundary
// epoch to be written before the respective head change notification and hence be tracked
// at the wrong epoch.
// This can happen if the node is offline or falls out of sync for an extended period of time.
// In this case we perform a full walk to avoid pathologies with pushing actually hot
// objects into the coldstore.
markTs = curTs
inclMsgs = true
log.Infof("sync gap detected at epoch %d; marking from current epoch to boundary epoch", syncGapEpoch)
} else {
// There is no pathological sync gap, so we can use the much faster single tipset walk at
// the boundary epoch.
boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true)
if err != nil {
return xerrors.Errorf("error getting tipset at boundary epoch: %w", err)
}
markTs = boundaryTs
}
var count int64 var count int64
err = s.walk(markTs, boundaryEpoch, inclMsgs, err = s.walk(curTs, boundaryEpoch, true,
func(cid cid.Cid) error { func(cid cid.Cid) error {
count++ count++
return markSet.Mark(cid) return markSet.Mark(cid)
@ -1100,11 +1056,6 @@ func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
return s.ds.Put(baseEpochKey, epochToBytes(epoch)) return s.ds.Put(baseEpochKey, epochToBytes(epoch))
} }
func (s *SplitStore) setSyncGapEpoch(epoch abi.ChainEpoch) error {
s.syncGapEpoch = epoch
return s.ds.Put(syncGapEpochKey, epochToBytes(epoch))
}
func epochToBytes(epoch abi.ChainEpoch) []byte { func epochToBytes(epoch abi.ChainEpoch) []byte {
return uint64ToBytes(uint64(epoch)) return uint64ToBytes(uint64(epoch))
} }