kill full splitstore compaction, simplify splitstore configuration

This commit is contained in:
vyzo 2021-06-14 20:19:06 +03:00
parent 5cca29d1db
commit 04f2e102a1
3 changed files with 12 additions and 273 deletions

View File

@ -86,17 +86,6 @@ type Config struct {
// //
// Supported values are: "bloom" (default if omitted), "bolt". // Supported values are: "bloom" (default if omitted), "bolt".
MarkSetType string MarkSetType string
// perform full reachability analysis (expensive) for compaction
// You should enable this option if you plan to use the splitstore without a backing coldstore
EnableFullCompaction bool
// EXPERIMENTAL enable pruning of unreachable objects.
// This has not been sufficiently tested yet; only enable if you know what you are doing.
// Only applies if you enable full compaction.
EnableGC bool
// full archival nodes should enable this if EnableFullCompaction is enabled
// do NOT enable this if you synced from a snapshot.
// Only applies if you enabled full compaction
Archival bool
} }
// ChainAccessor allows the Splitstore to access the chain. It will most likely // ChainAccessor allows the Splitstore to access the chain. It will most likely
@ -113,16 +102,10 @@ type SplitStore struct {
critsection int32 // compaction critical section critsection int32 // compaction critical section
closing int32 // the split store is closing closing int32 // the split store is closing
fullCompaction bool
enableGC bool
skipOldMsgs bool
skipMsgReceipts bool
baseEpoch abi.ChainEpoch baseEpoch abi.ChainEpoch
warmupEpoch abi.ChainEpoch warmupEpoch abi.ChainEpoch
coldPurgeSize int coldPurgeSize int
deadPurgeSize int
mx sync.Mutex mx sync.Mutex
curTs *types.TipSet curTs *types.TipSet
@ -165,18 +148,9 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
tracker: tracker, tracker: tracker,
env: env, env: env,
fullCompaction: cfg.EnableFullCompaction,
enableGC: cfg.EnableGC,
skipOldMsgs: !(cfg.EnableFullCompaction && cfg.Archival),
skipMsgReceipts: !(cfg.EnableFullCompaction && cfg.Archival),
coldPurgeSize: defaultColdPurgeSize, coldPurgeSize: defaultColdPurgeSize,
} }
if cfg.EnableGC {
ss.deadPurgeSize = defaultDeadPurgeSize
}
return ss, nil return ss, nil
} }
@ -465,7 +439,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
batchSnoop := make([]cid.Cid, 0, batchSize) batchSnoop := make([]cid.Cid, 0, batchSize)
count := int64(0) count := int64(0)
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, err := s.chain.WalkSnapshot(context.Background(), curTs, 1, true, true,
func(cid cid.Cid) error { func(cid cid.Cid) error {
count++ count++
@ -556,11 +530,7 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
} }
start := time.Now() start := time.Now()
if s.fullCompaction { err = s.doCompact(curTs)
err = s.compactFull(curTs)
} else {
err = s.compactSimple(curTs)
}
took := time.Since(start).Milliseconds() took := time.Since(start).Milliseconds()
stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
@ -571,7 +541,7 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error { func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error {
var count int64 var count int64
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, err := s.chain.WalkSnapshot(context.Background(), curTs, 1, true, true,
func(cid cid.Cid) error { func(cid cid.Cid) error {
count++ count++
return nil return nil
@ -585,12 +555,12 @@ func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error {
return nil return nil
} }
func (s *SplitStore) compactSimple(curTs *types.TipSet) error { func (s *SplitStore) doCompact(curTs *types.TipSet) error {
coldEpoch := s.baseEpoch + CompactionCold coldEpoch := s.baseEpoch + CompactionCold
currentEpoch := curTs.Height() currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary boundaryEpoch := currentEpoch - CompactionBoundary
log.Infow("running simple compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch) log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch)
coldSet, err := s.env.Create("cold", s.markSetSize) coldSet, err := s.env.Create("cold", s.markSetSize)
if err != nil { if err != nil {
@ -608,7 +578,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) error {
} }
var count int64 var count int64
err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, s.skipOldMsgs, s.skipMsgReceipts, err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, true, true,
func(cid cid.Cid) error { func(cid cid.Cid) error {
count++ count++
return coldSet.Mark(cid) return coldSet.Mark(cid)
@ -826,231 +796,6 @@ func (s *SplitStore) gcHotstore() {
} }
} }
func (s *SplitStore) compactFull(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
coldEpoch := s.baseEpoch + CompactionCold
boundaryEpoch := currentEpoch - CompactionBoundary
log.Infow("running full compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch)
// create two mark sets, one for marking the cold finality region
// and one for marking the hot region
hotSet, err := s.env.Create("hot", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating hot mark set: %w", err)
}
defer hotSet.Close() //nolint:errcheck
coldSet, err := s.env.Create("cold", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating cold mark set: %w", err)
}
defer coldSet.Close() //nolint:errcheck
// Phase 1: marking
log.Info("marking live blocks")
startMark := time.Now()
// Phase 1a: mark all reachable CIDs in the hot range
boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true)
if err != nil {
return xerrors.Errorf("error getting tipset at boundary epoch: %w", err)
}
count := int64(0)
err = s.chain.WalkSnapshot(context.Background(), boundaryTs, boundaryEpoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return hotSet.Mark(cid)
})
if err != nil {
return xerrors.Errorf("error marking hot blocks: %w", err)
}
if count > s.markSetSize {
s.markSetSize = count + count>>2 // overestimate a bit
}
// Phase 1b: mark all reachable CIDs in the cold range
coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
if err != nil {
return xerrors.Errorf("error getting tipset at cold epoch: %w", err)
}
count = 0
err = s.chain.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)
})
if err != nil {
return xerrors.Errorf("error marking cold blocks: %w", err)
}
if count > s.markSetSize {
s.markSetSize = count + count>>2 // overestimate a bit
}
log.Infow("marking done", "took", time.Since(startMark))
// Phase 2: sweep cold objects:
// - If a cold object is reachable in the hot range, it stays in the hotstore.
// - If a cold object is reachable in the cold range, it is moved to the coldstore.
// - If a cold object is unreachable, it is deleted if GC is enabled, otherwise moved to the coldstore.
log.Info("collecting cold objects")
startCollect := time.Now()
// some stats for logging
var hotCnt, coldCnt, deadCnt int
cold := make([]cid.Cid, 0, s.coldPurgeSize)
dead := make([]cid.Cid, 0, s.deadPurgeSize)
// 2.1 iterate through the tracker and collect cold and dead objects
err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
// is the object stil hot?
if wrEpoch > coldEpoch {
// yes, stay in the hotstore
hotCnt++
return nil
}
// the object is cold -- check whether it is reachable in the hot range
mark, err := hotSet.Has(cid)
if err != nil {
return xerrors.Errorf("error checking live mark for %s: %w", cid, err)
}
if mark {
// the object is reachable in the hot range, stay in the hotstore
hotCnt++
return nil
}
// check whether it is reachable in the cold range
mark, err = coldSet.Has(cid)
if err != nil {
return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err)
}
if s.enableGC {
if mark {
// the object is reachable in the cold range, move it to the cold store
cold = append(cold, cid)
coldCnt++
} else {
// the object is dead and will be deleted
dead = append(dead, cid)
deadCnt++
}
} else {
// if GC is disabled, we move both cold and dead objects to the coldstore
cold = append(cold, cid)
if mark {
coldCnt++
} else {
deadCnt++
}
}
return nil
})
if err != nil {
return xerrors.Errorf("error collecting cold objects: %w", err)
}
if coldCnt > 0 {
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
}
if deadCnt > 0 {
s.deadPurgeSize = deadCnt + deadCnt>>2 // overestimate a bit
}
log.Infow("collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "dead", deadCnt)
stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionDead.M(int64(deadCnt)))
// Enter critical section
atomic.StoreInt32(&s.critsection, 1)
defer atomic.StoreInt32(&s.critsection, 0)
// check to see if we are closing first; if that's the case just return
if atomic.LoadInt32(&s.closing) == 1 {
log.Info("splitstore is closing; aborting compaction")
return xerrors.Errorf("compaction aborted")
}
// 2.2 copy the cold objects to the coldstore
log.Info("moving cold objects to the coldstore")
startMove := time.Now()
err = s.moveColdBlocks(cold)
if err != nil {
return xerrors.Errorf("error moving cold blocks: %w", err)
}
log.Infow("moving done", "took", time.Since(startMove))
// 2.3 delete cold objects from the hotstore
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.purgeBlocks(cold)
if err != nil {
return xerrors.Errorf("error purging cold blocks: %w", err)
}
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
// 2.4 remove the tracker tracking for cold objects
startPurge = time.Now()
log.Info("purging cold objects from tracker")
err = s.purgeTracking(cold)
if err != nil {
return xerrors.Errorf("error purging tracking for cold blocks: %w", err)
}
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
// 3. if we have dead objects, delete them from the hotstore and remove the tracking
if len(dead) > 0 {
log.Info("deleting dead objects")
err = s.purgeBlocks(dead)
if err != nil {
return xerrors.Errorf("error purging dead blocks: %w", err)
}
// remove the tracker tracking
startPurge := time.Now()
log.Info("purging dead objects from tracker")
err = s.purgeTracking(dead)
if err != nil {
return xerrors.Errorf("error purging tracking for dead blocks: %w", err)
}
log.Infow("purging dead from tracker done", "took", time.Since(startPurge))
}
// we are done; do some housekeeping
err = s.tracker.Sync()
if err != nil {
return xerrors.Errorf("error syncing tracker: %w", err)
}
s.gcHotstore()
err = s.setBaseEpoch(coldEpoch)
if err != nil {
return xerrors.Errorf("error saving base epoch: %w", err)
}
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
return xerrors.Errorf("error saving mark set size: %w", err)
}
return nil
}
func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error { func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
s.baseEpoch = epoch s.baseEpoch = epoch
// write to datastore // write to datastore

View File

@ -233,9 +233,6 @@ type Splitstore struct {
HotStoreType string HotStoreType string
TrackingStoreType string TrackingStoreType string
MarkSetType string MarkSetType string
EnableFullCompaction bool
EnableGC bool // EXPERIMENTAL
Archival bool
} }
// // Full Node // // Full Node

View File

@ -80,9 +80,6 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
cfg := &splitstore.Config{ cfg := &splitstore.Config{
TrackingStoreType: cfg.Splitstore.TrackingStoreType, TrackingStoreType: cfg.Splitstore.TrackingStoreType,
MarkSetType: cfg.Splitstore.MarkSetType, MarkSetType: cfg.Splitstore.MarkSetType,
EnableFullCompaction: cfg.Splitstore.EnableFullCompaction,
EnableGC: cfg.Splitstore.EnableGC,
Archival: cfg.Splitstore.Archival,
} }
ss, err := splitstore.Open(path, ds, hot, cold, cfg) ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil { if err != nil {