recursively protect all references
This commit is contained in:
parent
4d286da593
commit
f124389b66
@ -49,6 +49,10 @@ var (
|
|||||||
// we will walk the chain for live objects.
|
// we will walk the chain for live objects.
|
||||||
CompactionBoundary = 4 * build.Finality
|
CompactionBoundary = 4 * build.Finality
|
||||||
|
|
||||||
|
// CompactionLookback is the number of epochs from the current epoch at which
|
||||||
|
// we will consider marking an old block reference.
|
||||||
|
CompactionLookback = 2 * build.Finality
|
||||||
|
|
||||||
// SyncGapTime is the time delay from a tipset's min timestamp before we decide
|
// SyncGapTime is the time delay from a tipset's min timestamp before we decide
|
||||||
// there is a sync gap
|
// there is a sync gap
|
||||||
SyncGapTime = time.Minute
|
SyncGapTime = time.Minute
|
||||||
@ -143,7 +147,7 @@ type SplitStore struct {
|
|||||||
// protection for concurrent read/writes during compaction
|
// protection for concurrent read/writes during compaction
|
||||||
txnLk sync.RWMutex
|
txnLk sync.RWMutex
|
||||||
txnActive bool
|
txnActive bool
|
||||||
txnBoundaryEpoch abi.ChainEpoch
|
txnLookbackEpoch abi.ChainEpoch
|
||||||
txnEnv MarkSetEnv
|
txnEnv MarkSetEnv
|
||||||
txnProtect MarkSet
|
txnProtect MarkSet
|
||||||
txnMarkSet MarkSet
|
txnMarkSet MarkSet
|
||||||
@ -585,7 +589,7 @@ func (s *SplitStore) updateWriteEpoch() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
|
func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error {
|
||||||
if !s.txnActive {
|
if !s.txnActive {
|
||||||
// not compacting
|
// not compacting
|
||||||
return nil
|
return nil
|
||||||
@ -600,12 +604,53 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// we have finished marking, protect the reference
|
// we have finished marking, protect the reference
|
||||||
if !recursive {
|
if !deep {
|
||||||
// shallow protect
|
return s.doTxnProtect(c)
|
||||||
return s.txnProtect.Mark(c)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// it's a recursive reference in vm context, protect links if they are not in the markset already
|
return s.doTxnProtectDeep(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) doTxnProtect(c cid.Cid) error {
|
||||||
|
// it's a shallow reference, protect with a standard walk without occur checking
|
||||||
|
return s.walkObject(c, cid.NewSet(),
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
mark, err := s.txnMarkSet.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// it's marked, nothing to do
|
||||||
|
if mark {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
mark, err = s.txnProtect.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// it's protected, nothing to do
|
||||||
|
if mark {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
// old block reference -- see comment in doCompact about the necessity of this
|
||||||
|
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if isOldBlock {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.txnProtect.Mark(c)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) doTxnProtectDeep(c cid.Cid) error {
|
||||||
|
// it's a deep reference potentially in vm context
|
||||||
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
||||||
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
||||||
// got recreated by the VM.
|
// got recreated by the VM.
|
||||||
@ -622,7 +667,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// old block reference -- see comment in doCompact about the necessity of this
|
// old block reference -- see comment in doCompact about the necessity of this
|
||||||
isOldBlock, err := s.isOldBlockHeader(c, s.txnBoundaryEpoch)
|
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||||
}
|
}
|
||||||
@ -795,8 +840,9 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
|
|||||||
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||||
currentEpoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
boundaryEpoch := currentEpoch - CompactionBoundary
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
|
lookbackEpoch := currentEpoch - CompactionLookback
|
||||||
|
|
||||||
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch)
|
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "lookbackEpoch", lookbackEpoch)
|
||||||
|
|
||||||
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -809,7 +855,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
s.txnRefs = make(map[cid.Cid]struct{})
|
s.txnRefs = make(map[cid.Cid]struct{})
|
||||||
s.txnActive = true
|
s.txnActive = true
|
||||||
s.txnBoundaryEpoch = boundaryEpoch
|
s.txnLookbackEpoch = lookbackEpoch
|
||||||
s.txnLk.Unlock()
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
||||||
@ -893,7 +939,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
//
|
//
|
||||||
// Ideally, we would have API options to preclude us from trcking references to such
|
// Ideally, we would have API options to preclude us from trcking references to such
|
||||||
// objects, but we don't so we have to do this check
|
// objects, but we don't so we have to do this check
|
||||||
isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch)
|
isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||||
}
|
}
|
||||||
@ -960,7 +1006,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// see comment above for this check
|
// see comment above for this check
|
||||||
isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch)
|
isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
func init() {
|
func init() {
|
||||||
CompactionThreshold = 5
|
CompactionThreshold = 5
|
||||||
CompactionBoundary = 2
|
CompactionBoundary = 2
|
||||||
|
CompactionLookback = 2
|
||||||
logging.SetLogLevel("splitstore", "DEBUG")
|
logging.SetLogLevel("splitstore", "DEBUG")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user