sortless compaction
This commit is contained in:
parent
a4f720d866
commit
dbc8903bac
@ -129,8 +129,6 @@ type SplitStore struct {
|
|||||||
|
|
||||||
headChangeMx sync.Mutex
|
headChangeMx sync.Mutex
|
||||||
|
|
||||||
coldPurgeSize int
|
|
||||||
|
|
||||||
chain ChainAccessor
|
chain ChainAccessor
|
||||||
ds dstore.Datastore
|
ds dstore.Datastore
|
||||||
cold bstore.Blockstore
|
cold bstore.Blockstore
|
||||||
@ -158,6 +156,7 @@ type SplitStore struct {
|
|||||||
txnRefsMx sync.Mutex
|
txnRefsMx sync.Mutex
|
||||||
txnRefs map[cid.Cid]struct{}
|
txnRefs map[cid.Cid]struct{}
|
||||||
txnMissing map[cid.Cid]struct{}
|
txnMissing map[cid.Cid]struct{}
|
||||||
|
txnMarkSet MarkSet
|
||||||
|
|
||||||
// registered protectors
|
// registered protectors
|
||||||
protectors []func(func(cid.Cid) error) error
|
protectors []func(func(cid.Cid) error) error
|
||||||
@ -194,8 +193,6 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
|||||||
cold: cold,
|
cold: cold,
|
||||||
hot: hots,
|
hot: hots,
|
||||||
markSetEnv: markSetEnv,
|
markSetEnv: markSetEnv,
|
||||||
|
|
||||||
coldPurgeSize: defaultColdPurgeSize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ss.txnViewsCond.L = &ss.txnViewsMx
|
ss.txnViewsCond.L = &ss.txnViewsMx
|
||||||
@ -208,6 +205,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ss.checkpointExists() {
|
||||||
|
log.Info("found compaction checkpoint; resuming compaction")
|
||||||
|
if err := ss.completeCompaction(); err != nil {
|
||||||
|
markSetEnv.Close()
|
||||||
|
return nil, xerrors.Errorf("error resuming compaction: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ss, nil
|
return ss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,6 +235,16 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
|
|||||||
s.txnLk.RLock()
|
s.txnLk.RLock()
|
||||||
defer s.txnLk.RUnlock()
|
defer s.txnLk.RUnlock()
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
has, err := s.txnMarkSet.Has(cid)
|
||||||
|
if has || err != nil {
|
||||||
|
return has, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.cold.Has(ctx, cid)
|
||||||
|
}
|
||||||
|
|
||||||
has, err := s.hot.Has(ctx, cid)
|
has, err := s.hot.Has(ctx, cid)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -257,6 +272,20 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
|
|||||||
s.txnLk.RLock()
|
s.txnLk.RLock()
|
||||||
defer s.txnLk.RUnlock()
|
defer s.txnLk.RUnlock()
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
has, err := s.txnMarkSet.Has(cid)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
return s.hot.Get(ctx, cid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.cold.Get(ctx, cid)
|
||||||
|
}
|
||||||
|
|
||||||
blk, err := s.hot.Get(ctx, cid)
|
blk, err := s.hot.Get(ctx, cid)
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
@ -294,6 +323,20 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
|
|||||||
s.txnLk.RLock()
|
s.txnLk.RLock()
|
||||||
defer s.txnLk.RUnlock()
|
defer s.txnLk.RUnlock()
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
has, err := s.txnMarkSet.Has(cid)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
return s.hot.GetSize(ctx, cid)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.cold.GetSize(ctx, cid)
|
||||||
|
}
|
||||||
|
|
||||||
size, err := s.hot.GetSize(ctx, cid)
|
size, err := s.hot.GetSize(ctx, cid)
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
@ -332,6 +375,11 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
|
|||||||
|
|
||||||
s.debug.LogWrite(blk)
|
s.debug.LogWrite(blk)
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
return s.txnMarkSet.Mark(blk.Cid())
|
||||||
|
}
|
||||||
|
|
||||||
s.trackTxnRef(blk.Cid())
|
s.trackTxnRef(blk.Cid())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -377,6 +425,11 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error {
|
|||||||
|
|
||||||
s.debug.LogWriteMany(blks)
|
s.debug.LogWriteMany(blks)
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
return s.txnMarkSet.MarkMany(batch)
|
||||||
|
}
|
||||||
|
|
||||||
s.trackTxnRefMany(batch)
|
s.trackTxnRefMany(batch)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -436,6 +489,24 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
|
|||||||
return cb(data)
|
return cb(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
s.txnLk.RLock()
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
has, err := s.txnMarkSet.Has(cid)
|
||||||
|
s.txnLk.RUnlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
return s.hot.View(ctx, cid, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.cold.View(ctx, cid, cb)
|
||||||
|
}
|
||||||
|
s.txnLk.RUnlock()
|
||||||
|
|
||||||
// views are (optimistically) protected two-fold:
|
// views are (optimistically) protected two-fold:
|
||||||
// - if there is an active transaction, then the reference is protected.
|
// - if there is an active transaction, then the reference is protected.
|
||||||
// - if there is no active transaction, active views are tracked in a
|
// - if there is no active transaction, active views are tracked in a
|
||||||
|
@ -3,8 +3,9 @@ package splitstore
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -387,6 +388,12 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||||
|
if s.checkpointExists() {
|
||||||
|
// this really shouldn't happen, but if it somehow does, it means that the hotstore
|
||||||
|
// might be potentially inconsistent; abort compaction and notify the user to intervene.
|
||||||
|
return xerrors.Errorf("checkpoint exists; aborting compaction")
|
||||||
|
}
|
||||||
|
|
||||||
currentEpoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
boundaryEpoch := currentEpoch - CompactionBoundary
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
|
||||||
@ -409,9 +416,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// we are ready for concurrent marking
|
|
||||||
s.beginTxnMarking(markSet)
|
|
||||||
|
|
||||||
// 0. track all protected references at beginning of compaction; anything added later should
|
// 0. track all protected references at beginning of compaction; anything added later should
|
||||||
// be transactionally protected by the write
|
// be transactionally protected by the write
|
||||||
log.Info("protecting references with registered protectors")
|
log.Info("protecting references with registered protectors")
|
||||||
@ -425,7 +429,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
log.Info("marking reachable objects")
|
log.Info("marking reachable objects")
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
var count int64
|
count := new(int64)
|
||||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
|
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
@ -441,7 +445,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return errStopWalk
|
return errStopWalk
|
||||||
}
|
}
|
||||||
|
|
||||||
count++
|
atomic.AddInt64(count, 1)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -449,9 +453,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error marking: %w", err)
|
return xerrors.Errorf("error marking: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.markSetSize = count + count>>2 // overestimate a bit
|
s.markSetSize = *count + *count>>2 // overestimate a bit
|
||||||
|
|
||||||
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -471,10 +475,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
log.Info("collecting cold objects")
|
log.Info("collecting cold objects")
|
||||||
startCollect := time.Now()
|
startCollect := time.Now()
|
||||||
|
|
||||||
|
coldw, err := NewColdSetWriter(s.coldSetPath())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error creating coldset: %w", err)
|
||||||
|
}
|
||||||
|
defer coldw.Close() //nolint:errcheck
|
||||||
|
|
||||||
// some stats for logging
|
// some stats for logging
|
||||||
var hotCnt, coldCnt int
|
var hotCnt, coldCnt int
|
||||||
|
|
||||||
cold := make([]cid.Cid, 0, s.coldPurgeSize)
|
|
||||||
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
||||||
// was it marked?
|
// was it marked?
|
||||||
mark, err := markSet.Has(c)
|
mark, err := markSet.Has(c)
|
||||||
@ -488,7 +497,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// it's cold, mark it as candidate for move
|
// it's cold, mark it as candidate for move
|
||||||
cold = append(cold, c)
|
coldw.Write(c)
|
||||||
coldCnt++
|
coldCnt++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -498,12 +507,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error collecting cold objects: %w", err)
|
return xerrors.Errorf("error collecting cold objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("cold collection done", "took", time.Since(startCollect))
|
if err := coldw.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("error closing coldset: %w", err)
|
||||||
if coldCnt > 0 {
|
|
||||||
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infow("cold collection done", "took", time.Since(startCollect))
|
||||||
|
|
||||||
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
|
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
|
||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
|
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
|
||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
|
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
|
||||||
@ -512,20 +521,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// now that we have collected cold objects, check for missing references from transactional i/o
|
coldr, err := NewColdSetReader(s.coldSetPath())
|
||||||
// and disable further collection of such references (they will not be acted upon as we can't
|
if err != nil {
|
||||||
// possibly delete objects we didn't have when we were collecting cold objects)
|
return xerrors.Errorf("error opening coldset: %w", err)
|
||||||
s.waitForMissingRefs(markSet)
|
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
defer coldr.Close() //nolint:errcheck
|
||||||
|
|
||||||
// 3. copy the cold objects to the coldstore -- if we have one
|
// 3. copy the cold objects to the coldstore -- if we have one
|
||||||
if !s.cfg.DiscardColdBlocks {
|
if !s.cfg.DiscardColdBlocks {
|
||||||
log.Info("moving cold objects to the coldstore")
|
log.Info("moving cold objects to the coldstore")
|
||||||
startMove := time.Now()
|
startMove := time.Now()
|
||||||
err = s.moveColdBlocks(cold)
|
err = s.moveColdBlocks(coldr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error moving cold objects: %w", err)
|
return xerrors.Errorf("error moving cold objects: %w", err)
|
||||||
}
|
}
|
||||||
@ -534,41 +540,57 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := coldr.Reset(); err != nil {
|
||||||
|
return xerrors.Errorf("error resetting coldset: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. sort cold objects so that the dags with most references are deleted first
|
// 4. Purge cold objects with checkpointing for recovery.
|
||||||
// this ensures that we can't refer to a dag with its consituents already deleted, ie
|
// This is the critical section of compaction, whereby any cold object not in the markSet is
|
||||||
// we lave no dangling references.
|
// considered already deleted.
|
||||||
log.Info("sorting cold objects")
|
// We delete cold objects in batches, holding the transaction lock, where we check the markSet
|
||||||
startSort := time.Now()
|
// again for new references created by the VM.
|
||||||
err = s.sortObjects(cold)
|
// After each batch, we write a checkpoint to disk; if the process is interrupted before completion,
|
||||||
if err != nil {
|
// the process will continue from the checkpoint in the next recovery.
|
||||||
return xerrors.Errorf("error sorting objects: %w", err)
|
if err := s.beginCriticalSection(markSet); err != nil {
|
||||||
}
|
return xerrors.Errorf("error beginning critical section: %w", err)
|
||||||
log.Infow("sorting done", "took", time.Since(startSort))
|
|
||||||
|
|
||||||
// 4.1 protect transactional refs once more
|
|
||||||
// strictly speaking, this is not necessary as purge will do it before deleting each
|
|
||||||
// batch. however, there is likely a largish number of references accumulated during
|
|
||||||
// ths sort and this protects before entering pruge context.
|
|
||||||
err = s.protectTxnRefs(markSet)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("error protecting transactional refs: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkpoint, err := NewCheckpoint(s.checkpointPath())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error creating checkpoint: %w", err)
|
||||||
|
}
|
||||||
|
defer checkpoint.Close() //nolint:errcheck
|
||||||
|
|
||||||
// 5. purge cold objects from the hotstore, taking protected references into account
|
// 5. purge cold objects from the hotstore, taking protected references into account
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
err = s.purge(cold, markSet)
|
err = s.purge(coldr, checkpoint, markSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error purging cold blocks: %w", err)
|
return xerrors.Errorf("error purging cold objects: %w", err)
|
||||||
}
|
}
|
||||||
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
|
s.endCriticalSection()
|
||||||
|
|
||||||
|
if err := checkpoint.Close(); err != nil {
|
||||||
|
log.Warnf("error closing checkpoint: %s", err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(s.checkpointPath()); err != nil {
|
||||||
|
log.Warnf("error removing checkpoint: %s", err)
|
||||||
|
}
|
||||||
|
if err := coldr.Close(); err != nil {
|
||||||
|
log.Warnf("error closing coldset: %s", err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(s.coldSetPath()); err != nil {
|
||||||
|
log.Warnf("error removing coldset: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
// we are done; do some housekeeping
|
// we are done; do some housekeeping
|
||||||
s.endTxnProtect()
|
s.endTxnProtect()
|
||||||
s.gcHotstore()
|
s.gcHotstore()
|
||||||
@ -603,8 +625,22 @@ func (s *SplitStore) beginTxnProtect() {
|
|||||||
s.txnMissing = make(map[cid.Cid]struct{})
|
s.txnMissing = make(map[cid.Cid]struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
|
func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
|
||||||
log.Info("beginning transactional marking")
|
log.Info("beginning critical section")
|
||||||
|
|
||||||
|
if err := markSet.BeginCriticalSection(); err != nil {
|
||||||
|
return xerrors.Errorf("error beginning critical section for markset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.txnLk.Lock()
|
||||||
|
s.txnMarkSet = markSet
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
|
if err := s.protectTxnRefs(markSet); err != nil {
|
||||||
|
return xerrors.Errorf("error protecting transactional references: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) endTxnProtect() {
|
func (s *SplitStore) endTxnProtect() {
|
||||||
@ -618,6 +654,17 @@ func (s *SplitStore) endTxnProtect() {
|
|||||||
s.txnActive = false
|
s.txnActive = false
|
||||||
s.txnRefs = nil
|
s.txnRefs = nil
|
||||||
s.txnMissing = nil
|
s.txnMissing = nil
|
||||||
|
s.txnMarkSet = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) endCriticalSection() {
|
||||||
|
log.Info("ending critical section")
|
||||||
|
|
||||||
|
s.txnLk.Lock()
|
||||||
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
|
s.txnMarkSet.EndCriticalSection()
|
||||||
|
s.txnMarkSet = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
|
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
|
||||||
@ -892,10 +939,10 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) {
|
|||||||
return s.cold.Has(s.ctx, c)
|
return s.cold.Has(s.ctx, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
|
||||||
batch := make([]blocks.Block, 0, batchSize)
|
batch := make([]blocks.Block, 0, batchSize)
|
||||||
|
|
||||||
for _, c := range cold {
|
coldr.ForEach(func(c cid.Cid) error {
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -904,7 +951,7 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if err == bstore.ErrNotFound {
|
if err == bstore.ErrNotFound {
|
||||||
log.Warnf("hotstore missing block %s", c)
|
log.Warnf("hotstore missing block %s", c)
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
|
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
|
||||||
@ -918,7 +965,9 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
|||||||
}
|
}
|
||||||
batch = batch[:0]
|
batch = batch[:0]
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
err := s.cold.PutMany(s.ctx, batch)
|
err := s.cold.PutMany(s.ctx, batch)
|
||||||
@ -930,160 +979,60 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorts a slice of objects heaviest first -- it's a little expensive but worth the
|
func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet MarkSet) error {
|
||||||
// guarantee that we don't leave dangling references behind, e.g. if we die in the middle
|
batch := make([]cid.Cid, 0, batchSize)
|
||||||
// of a purge.
|
|
||||||
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
|
|
||||||
// we cache the keys to avoid making a gazillion of strings
|
|
||||||
keys := make(map[cid.Cid]string)
|
|
||||||
key := func(c cid.Cid) string {
|
|
||||||
s, ok := keys[c]
|
|
||||||
if !ok {
|
|
||||||
s = string(c.Hash())
|
|
||||||
keys[c] = s
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute sorting weights as the cumulative number of DAG links
|
|
||||||
weights := make(map[string]int)
|
|
||||||
for _, c := range cids {
|
|
||||||
// this can take quite a while, so check for shutdown with every opportunity
|
|
||||||
if err := s.checkClosing(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
w := s.getObjectWeight(c, weights, key)
|
|
||||||
weights[key(c)] = w
|
|
||||||
}
|
|
||||||
|
|
||||||
// sort!
|
|
||||||
sort.Slice(cids, func(i, j int) bool {
|
|
||||||
wi := weights[key(cids[i])]
|
|
||||||
wj := weights[key(cids[j])]
|
|
||||||
if wi == wj {
|
|
||||||
return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return wi > wj
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int {
|
|
||||||
w, ok := weights[key(c)]
|
|
||||||
if ok {
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
// we treat block headers specially to avoid walking the entire chain
|
|
||||||
var hdr types.BlockHeader
|
|
||||||
err := s.view(c, func(data []byte) error {
|
|
||||||
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key)
|
|
||||||
weights[key(hdr.ParentStateRoot)] = w1
|
|
||||||
|
|
||||||
w2 := s.getObjectWeight(hdr.Messages, weights, key)
|
|
||||||
weights[key(hdr.Messages)] = w2
|
|
||||||
|
|
||||||
return 1 + w1 + w2
|
|
||||||
}
|
|
||||||
|
|
||||||
var links []cid.Cid
|
|
||||||
err = s.view(c, func(data []byte) error {
|
|
||||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
|
||||||
links = append(links, c)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
|
|
||||||
w = 1
|
|
||||||
for _, c := range links {
|
|
||||||
// these are internal refs, so dags will be dags
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
|
||||||
w++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
wc := s.getObjectWeight(c, weights, key)
|
|
||||||
weights[key(c)] = wc
|
|
||||||
|
|
||||||
w += wc
|
|
||||||
}
|
|
||||||
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {
|
|
||||||
if len(cids) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// we don't delete one giant batch of millions of objects, but rather do smaller batches
|
|
||||||
// so that we don't stop the world for an extended period of time
|
|
||||||
done := false
|
|
||||||
for i := 0; !done; i++ {
|
|
||||||
start := i * batchSize
|
|
||||||
end := start + batchSize
|
|
||||||
if end >= len(cids) {
|
|
||||||
end = len(cids)
|
|
||||||
done = true
|
|
||||||
}
|
|
||||||
|
|
||||||
err := deleteBatch(cids[start:end])
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("error deleting batch: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
|
|
||||||
deadCids := make([]cid.Cid, 0, batchSize)
|
deadCids := make([]cid.Cid, 0, batchSize)
|
||||||
|
|
||||||
var purgeCnt, liveCnt int
|
var purgeCnt, liveCnt int
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return s.purgeBatch(cids,
|
deleteBatch := func() error {
|
||||||
func(cids []cid.Cid) error {
|
pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
|
||||||
deadCids := deadCids[:0]
|
|
||||||
|
purgeCnt += pc
|
||||||
|
liveCnt += lc
|
||||||
|
batch = batch[:0]
|
||||||
|
|
||||||
for {
|
|
||||||
if err := s.checkClosing(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.txnLk.Lock()
|
err := coldr.ForEach(func(c cid.Cid) error {
|
||||||
if len(s.txnRefs) == 0 {
|
batch = append(batch, c)
|
||||||
// keep the lock!
|
if len(batch) == batchSize {
|
||||||
break
|
return deleteBatch()
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlock and protect
|
return nil
|
||||||
s.txnLk.Unlock()
|
})
|
||||||
|
|
||||||
err := s.protectTxnRefs(markSet)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error protecting transactional refs: %w", err)
|
return err
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
return deleteBatch()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) purgeBatch(batch, deadCids []cid.Cid, checkpoint *Checkpoint, markSet MarkSet) (purgeCnt int, liveCnt int, err error) {
|
||||||
|
if err := s.checkClosing(); err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.txnLk.Lock()
|
||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
for _, c := range cids {
|
for _, c := range batch {
|
||||||
live, err := markSet.Has(c)
|
has, err := markSet.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking for liveness: %w", err)
|
return 0, 0, xerrors.Errorf("error checking markset for liveness: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if live {
|
if has {
|
||||||
liveCnt++
|
liveCnt++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -1091,91 +1040,139 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
|
|||||||
deadCids = append(deadCids, c)
|
deadCids = append(deadCids, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.hot.DeleteMany(s.ctx, deadCids)
|
if len(deadCids) == 0 {
|
||||||
if err != nil {
|
if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
|
||||||
return xerrors.Errorf("error purging cold objects: %w", err)
|
return 0, 0, xerrors.Errorf("error setting checkpoint: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, liveCnt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil {
|
||||||
|
return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.debug.LogDelete(deadCids)
|
s.debug.LogDelete(deadCids)
|
||||||
|
purgeCnt = len(deadCids)
|
||||||
|
|
||||||
purgeCnt += len(deadCids)
|
if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
|
||||||
|
return purgeCnt, liveCnt, xerrors.Errorf("error setting checkpoint: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return purgeCnt, liveCnt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) coldSetPath() string {
|
||||||
|
return filepath.Join(s.path, "coldset")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) checkpointPath() string {
|
||||||
|
return filepath.Join(s.path, "checkpoint")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) checkpointExists() bool {
|
||||||
|
_, err := os.Stat(s.checkpointPath())
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) completeCompaction() error {
|
||||||
|
checkpoint, last, err := OpenCheckpoint(s.checkpointPath())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error opening checkpoint: %w", err)
|
||||||
|
}
|
||||||
|
defer checkpoint.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
coldr, err := NewColdSetReader(s.coldSetPath())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error opening coldset: %w", err)
|
||||||
|
}
|
||||||
|
defer coldr.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
markSet, err := s.markSetEnv.Recover("live")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error recovering markset: %w", err)
|
||||||
|
}
|
||||||
|
defer markSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
// PURGE
|
||||||
|
log.Info("purging cold objects from the hotstore")
|
||||||
|
startPurge := time.Now()
|
||||||
|
err = s.completePurge(coldr, checkpoint, last, markSet)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error purging cold objects: %w", err)
|
||||||
|
}
|
||||||
|
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
|
markSet.EndCriticalSection()
|
||||||
|
|
||||||
|
if err := checkpoint.Close(); err != nil {
|
||||||
|
log.Warnf("error closing checkpoint: %s", err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(s.checkpointPath()); err != nil {
|
||||||
|
log.Warnf("error removing checkpoint: %s", err)
|
||||||
|
}
|
||||||
|
if err := coldr.Close(); err != nil {
|
||||||
|
log.Warnf("error closing coldset: %s", err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(s.coldSetPath()); err != nil {
|
||||||
|
log.Warnf("error removing coldset: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: at this point we can start the splitstore; a compaction should run on
|
||||||
|
// the first head change, which will trigger gc on the hotstore.
|
||||||
|
// We don't mind the second (back-to-back) compaction as the head will
|
||||||
|
// have advanced during marking and coldset accumulation.
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// I really don't like having this code, but we seem to have some occasional DAG references with
|
func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint, start cid.Cid, markSet MarkSet) error {
|
||||||
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
|
if !start.Defined() {
|
||||||
// after a little bit.
|
return s.purge(coldr, checkpoint, markSet)
|
||||||
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
|
||||||
// have this gem[TM].
|
|
||||||
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
|
|
||||||
// thinks the cause may be block validation.
|
|
||||||
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
|
|
||||||
s.txnLk.Lock()
|
|
||||||
missing := s.txnMissing
|
|
||||||
s.txnMissing = nil
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
|
|
||||||
if len(missing) == 0 {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("waiting for missing references")
|
seeking := true
|
||||||
start := time.Now()
|
batch := make([]cid.Cid, 0, batchSize)
|
||||||
count := 0
|
deadCids := make([]cid.Cid, 0, batchSize)
|
||||||
|
|
||||||
|
var purgeCnt, liveCnt int
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("waiting for missing references done", "took", time.Since(start), "marked", count)
|
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for i := 0; i < 3 && len(missing) > 0; i++ {
|
deleteBatch := func() error {
|
||||||
if err := s.checkClosing(); err != nil {
|
pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
|
||||||
return
|
|
||||||
|
purgeCnt += pc
|
||||||
|
liveCnt += lc
|
||||||
|
batch = batch[:0]
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
wait := time.Duration(i) * time.Minute
|
err := coldr.ForEach(func(c cid.Cid) error {
|
||||||
log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i+1)
|
if seeking {
|
||||||
if wait > 0 {
|
if start.Equals(c) {
|
||||||
time.Sleep(wait)
|
seeking = false
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
batch = append(batch, c)
|
||||||
|
if len(batch) == batchSize {
|
||||||
|
return deleteBatch()
|
||||||
}
|
}
|
||||||
|
|
||||||
towalk := missing
|
|
||||||
visitor := newTmpVisitor()
|
|
||||||
missing = make(map[cid.Cid]struct{})
|
|
||||||
|
|
||||||
for c := range towalk {
|
|
||||||
err := s.walkObjectIncomplete(c, visitor,
|
|
||||||
func(c cid.Cid) error {
|
|
||||||
if isUnitaryObject(c) {
|
|
||||||
return errStopWalk
|
|
||||||
}
|
|
||||||
|
|
||||||
visit, err := markSet.Visit(c)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("error visiting object: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !visit {
|
|
||||||
return errStopWalk
|
|
||||||
}
|
|
||||||
|
|
||||||
count++
|
|
||||||
return nil
|
return nil
|
||||||
},
|
|
||||||
func(c cid.Cid) error {
|
|
||||||
missing[c] = struct{}{}
|
|
||||||
return errStopWalk
|
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error marking: %s", err)
|
return err
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(missing) > 0 {
|
if len(batch) > 0 {
|
||||||
log.Warnf("still missing %d references", len(missing))
|
return deleteBatch()
|
||||||
for c := range missing {
|
|
||||||
log.Warnf("unresolved missing reference: %s", c)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user