improve comments
This commit is contained in:
parent
fdff1bebc9
commit
c1c25868cc
@ -114,13 +114,13 @@ type ChainAccessor interface {
|
|||||||
type SplitStore struct {
|
type SplitStore struct {
|
||||||
compacting int32 // compaction (or warmp up) in progress
|
compacting int32 // compaction (or warmp up) in progress
|
||||||
critsection int32 // compaction critical section
|
critsection int32 // compaction critical section
|
||||||
closing int32 // the split store is closing
|
closing int32 // the splitstore is closing
|
||||||
|
|
||||||
cfg *Config
|
cfg *Config
|
||||||
|
|
||||||
baseEpoch abi.ChainEpoch
|
baseEpoch abi.ChainEpoch
|
||||||
warmupEpoch abi.ChainEpoch
|
warmupEpoch abi.ChainEpoch
|
||||||
writeEpoch abi.ChainEpoch // for debug logging
|
writeEpoch abi.ChainEpoch // for debug logging only
|
||||||
|
|
||||||
coldPurgeSize int
|
coldPurgeSize int
|
||||||
|
|
||||||
@ -140,7 +140,7 @@ type SplitStore struct {
|
|||||||
|
|
||||||
debug *debugLog
|
debug *debugLog
|
||||||
|
|
||||||
// protection for concurrent read/writes during compaction
|
// transactional protection for concurrent read/writes during compaction
|
||||||
txnLk sync.RWMutex
|
txnLk sync.RWMutex
|
||||||
txnActive bool
|
txnActive bool
|
||||||
txnLookbackEpoch abi.ChainEpoch
|
txnLookbackEpoch abi.ChainEpoch
|
||||||
@ -429,15 +429,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// load warmup epoch from metadata ds
|
// load warmup epoch from metadata ds
|
||||||
// if none, then the splitstore will warm up the hotstore at first head change notif
|
|
||||||
// by walking the current tipset
|
|
||||||
bs, err = s.ds.Get(warmupEpochKey)
|
bs, err = s.ds.Get(warmupEpochKey)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
s.warmupEpoch = bytesToEpoch(bs)
|
s.warmupEpoch = bytesToEpoch(bs)
|
||||||
|
|
||||||
case dstore.ErrNotFound:
|
case dstore.ErrNotFound:
|
||||||
// the hotstore hasn't warmed up, load the genesis into the hotstore
|
// the hotstore hasn't warmed up, start a concurrent warm up
|
||||||
err = s.warmup(s.curTs)
|
err = s.warmup(s.curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error warming up: %w", err)
|
return xerrors.Errorf("error warming up: %w", err)
|
||||||
@ -447,8 +445,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
return xerrors.Errorf("error loading warmup epoch: %w", err)
|
return xerrors.Errorf("error loading warmup epoch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// load markSetSize from metadata ds
|
// load markSetSize from metadata ds to provide a size hint for marksets
|
||||||
// if none, the splitstore will compute it during warmup and update in every compaction
|
|
||||||
bs, err = s.ds.Get(markSetSizeKey)
|
bs, err = s.ds.Get(markSetSizeKey)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
@ -504,7 +501,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||||
// we are currently compacting, do nothing and wait for the next head change
|
// we are currently compacting (or warming up); do nothing and wait for the next head change
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -568,6 +565,7 @@ func (s *SplitStore) updateWriteEpoch() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// transactionally protect a reference to an object
|
||||||
func (s *SplitStore) trackTxnRef(c cid.Cid) error {
|
func (s *SplitStore) trackTxnRef(c cid.Cid) error {
|
||||||
if !s.txnActive {
|
if !s.txnActive {
|
||||||
// not compacting
|
// not compacting
|
||||||
@ -586,6 +584,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid) error {
|
|||||||
return s.doTxnProtect(c, nil)
|
return s.doTxnProtect(c, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// transactionally protect a batch of references
|
||||||
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||||
if !s.txnActive {
|
if !s.txnActive {
|
||||||
// not compacting
|
// not compacting
|
||||||
@ -593,7 +592,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if s.txnRefs != nil {
|
if s.txnRefs != nil {
|
||||||
// we haven't finished marking yet, so track the reference
|
// we haven't finished marking yet, so track the references
|
||||||
s.txnRefsMx.Lock()
|
s.txnRefsMx.Lock()
|
||||||
for _, c := range cids {
|
for _, c := range cids {
|
||||||
s.txnRefs[c] = struct{}{}
|
s.txnRefs[c] = struct{}{}
|
||||||
@ -618,6 +617,8 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// transactionally protect a reference by walking the object and marking.
|
||||||
|
// concurrent markings are short circuited by checking the markset.
|
||||||
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
||||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||||
// cannot be deleted before the object itself.
|
// cannot be deleted before the object itself.
|
||||||
@ -674,6 +675,9 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
|
||||||
|
// this is necessary when we sync from a snapshot or when we enable the splitstore
|
||||||
|
// on top of an existing blockstore (which becomes the coldstore).
|
||||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||||
return xerrors.Errorf("error locking compaction")
|
return xerrors.Errorf("error locking compaction")
|
||||||
@ -697,6 +701,9 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the actual warmup procedure; it waslk the chain loading all state roots at the boundary
|
||||||
|
// and headers all the way up to genesis.
|
||||||
|
// objects are written in batches so as to minimize overhead.
|
||||||
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
||||||
epoch := curTs.Height()
|
epoch := curTs.Height()
|
||||||
batchHot := make([]blocks.Block, 0, batchSize)
|
batchHot := make([]blocks.Block, 0, batchSize)
|
||||||
@ -772,7 +779,17 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compaction/GC Algorithm
|
// --- Compaction ---
|
||||||
|
// Compaction works transactionally with the following algorithm:
|
||||||
|
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
|
||||||
|
// - We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis.
|
||||||
|
// - Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references.
|
||||||
|
// - We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge.
|
||||||
|
// - When running with a coldstore, we next copy all cold objects to the coldstore.
|
||||||
|
// - At this point we are ready to begin purging:
|
||||||
|
// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
|
||||||
|
// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
|
||||||
|
// - We then end the transaction and compact/gc the hotstore.
|
||||||
func (s *SplitStore) compact(curTs *types.TipSet) {
|
func (s *SplitStore) compact(curTs *types.TipSet) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := s.doCompact(curTs)
|
err := s.doCompact(curTs)
|
||||||
@ -801,7 +818,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
// 0. Prepare the transaction
|
// 0. Prepare the transaction
|
||||||
s.prepareTxnProtect(lookbackEpoch)
|
s.prepareTxnProtect(lookbackEpoch)
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
|
||||||
|
// and messages until the boundary epoch.
|
||||||
log.Info("marking reachable objects")
|
log.Info("marking reachable objects")
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
@ -928,7 +946,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error collecting candidate cold objects: %w", err)
|
return xerrors.Errorf("error collecting cold objects: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("cold collection done", "took", time.Since(startCollect))
|
log.Infow("cold collection done", "took", time.Since(startCollect))
|
||||||
@ -958,7 +976,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 4. sort cold objects so that the dags with most references are deleted first
|
// 4. sort cold objects so that the dags with most references are deleted first
|
||||||
// this ensures that we can't refer to a dag with its consituents already deleted
|
// this ensures that we can't refer to a dag with its consituents already deleted, ie
|
||||||
|
// we lave no dangling references.
|
||||||
log.Info("sorting cold objects")
|
log.Info("sorting cold objects")
|
||||||
startSort := time.Now()
|
startSort := time.Now()
|
||||||
err = s.sortObjects(cold)
|
err = s.sortObjects(cold)
|
||||||
@ -1144,7 +1163,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// like walkObject, but the object may be potentially incomplete (references missing from the hotstore)
|
// like walkObject, but the object may be potentially incomplete (references missing)
|
||||||
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
|
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
|
||||||
if !walked.Visit(c) {
|
if !walked.Visit(c) {
|
||||||
return nil
|
return nil
|
||||||
@ -1238,20 +1257,6 @@ func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlo
|
|||||||
return isOldBlock, err
|
return isOldBlock, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) isBlockHeader(c cid.Cid) (isBlock bool, err error) {
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.view(c, func(data []byte) error {
|
|
||||||
var hdr types.BlockHeader
|
|
||||||
isBlock = hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return isBlock, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
||||||
batch := make([]blocks.Block, 0, batchSize)
|
batch := make([]blocks.Block, 0, batchSize)
|
||||||
|
|
||||||
@ -1279,13 +1284,16 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
|||||||
if len(batch) > 0 {
|
if len(batch) > 0 {
|
||||||
err := s.cold.PutMany(batch)
|
err := s.cold.PutMany(batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error putting cold to coldstore: %w", err)
|
return xerrors.Errorf("error putting batch to coldstore: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sorts a slice of objects heaviest first -- it's a little expensive but worth the
|
||||||
|
// guarantee that we don't leave dangling references behind, e.g. if we die in the middle
|
||||||
|
// of a purge.
|
||||||
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
|
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
|
||||||
// we cache the keys to avoid making a gazillion of strings
|
// we cache the keys to avoid making a gazillion of strings
|
||||||
keys := make(map[cid.Cid]string)
|
keys := make(map[cid.Cid]string)
|
||||||
@ -1431,12 +1439,13 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// I really don't like having this code, but we seem to have some DAG references with missing
|
// I really don't like having this code, but we seem to have some occasional DAG references with
|
||||||
// constituents. During testing in mainnet *some* of these references *sometimes* appeared after a
|
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
|
||||||
// little bit.
|
// after a little bit.
|
||||||
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
||||||
// have this gem[TM].
|
// have this gem[TM].
|
||||||
// My best guess is that they are parent message receipts or yet to be computed state roots.
|
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
|
||||||
|
// thinks the cause may be block validation.
|
||||||
func (s *SplitStore) waitForMissingRefs() {
|
func (s *SplitStore) waitForMissingRefs() {
|
||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
missing := s.txnMissing
|
missing := s.txnMissing
|
||||||
|
Loading…
Reference in New Issue
Block a user