use deep object walking for more robust handling of transactional references
This commit is contained in:
parent
1f02428225
commit
680af8eb09
@ -141,13 +141,14 @@ type SplitStore struct {
|
||||
debug *debugLog
|
||||
|
||||
// protection for concurrent read/writes during compaction
|
||||
txnLk sync.RWMutex
|
||||
txnActive bool
|
||||
txnEnv MarkSetEnv
|
||||
txnProtect MarkSet
|
||||
txnMarkSet MarkSet
|
||||
txnRefsMx sync.Mutex
|
||||
txnRefs map[cid.Cid]struct{}
|
||||
txnLk sync.RWMutex
|
||||
txnActive bool
|
||||
txnBoundaryEpoch abi.ChainEpoch
|
||||
txnEnv MarkSetEnv
|
||||
txnProtect MarkSet
|
||||
txnMarkSet MarkSet
|
||||
txnRefsMx sync.Mutex
|
||||
txnRefs map[cid.Cid]struct{}
|
||||
}
|
||||
|
||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||
@ -600,45 +601,59 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
|
||||
|
||||
// we have finished marking, protect the reference
|
||||
if !recursive {
|
||||
// shallow protect
|
||||
return s.txnProtect.Mark(c)
|
||||
}
|
||||
|
||||
// it's a recursive reference in vm context, protect links if they are not in the markset already
|
||||
return s.walkObject(c, cid.NewSet(), func(c cid.Cid) error {
|
||||
mark, err := s.txnMarkSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
||||
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
||||
// got recreated by the VM.
|
||||
return s.walkObjectDeep(c, cid.NewSet(),
|
||||
func(c cid.Cid) error {
|
||||
mark, err := s.txnMarkSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
|
||||
// it's marked, nothing to do
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
// it's marked, nothing to do
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
live, err := s.txnProtect.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking portected set for %s: %w", c, err)
|
||||
}
|
||||
// old block reference -- see comment in doCompact about the necessity of this
|
||||
isOldBlock, err := s.isOldBlockHeader(c, s.txnBoundaryEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if live {
|
||||
return errStopWalk
|
||||
}
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
// this occurs check is necessary because cold objects are purged in arbitrary order
|
||||
has, err := s.hot.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking hotstore for %s: %w", c, err)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
// this occurs check is necessary because cold objects are purged in arbitrary order
|
||||
has, err := s.hot.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking hotstore for %s: %w", c, err)
|
||||
}
|
||||
|
||||
// it's not there (might have been deleted), signal to the vm to copy
|
||||
if !has {
|
||||
log.Warnf("missing object for recursive reference to %s", c)
|
||||
return errMissingObject
|
||||
}
|
||||
// it's not there (might have been deleted), signal to the vm to copy
|
||||
if !has {
|
||||
log.Warnf("missing object for recursive reference to %s", c)
|
||||
return errMissingObject
|
||||
}
|
||||
|
||||
// mark it
|
||||
return s.txnProtect.Mark(c)
|
||||
})
|
||||
// mark it in *both* sets, so that we can short-circuit a concurrent walk.
|
||||
err = s.txnMarkSet.Mark(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error marking %s: %w", c, err)
|
||||
}
|
||||
|
||||
return s.txnProtect.Mark(c)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||
@ -657,7 +672,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// we have finished marking, protect the refs
|
||||
// we have finished marking, shallow protect the refs
|
||||
for _, c := range cids {
|
||||
err := s.txnProtect.Mark(c)
|
||||
if err != nil {
|
||||
@ -794,6 +809,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
s.txnLk.Lock()
|
||||
s.txnRefs = make(map[cid.Cid]struct{})
|
||||
s.txnActive = true
|
||||
s.txnBoundaryEpoch = boundaryEpoch
|
||||
s.txnLk.Unlock()
|
||||
|
||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
||||
@ -839,61 +855,89 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}()
|
||||
|
||||
// 1.1 Update markset for references created during marking
|
||||
log.Info("updating mark set for live references")
|
||||
startMark = time.Now()
|
||||
walked := cid.NewSet()
|
||||
count = 0
|
||||
var missing []cid.Cid
|
||||
for c := range txnRefs {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||
}
|
||||
if len(txnRefs) > 0 {
|
||||
log.Info("updating mark set for live references", "refs", len(txnRefs))
|
||||
startMark = time.Now()
|
||||
walked := cid.NewSet()
|
||||
count = 0
|
||||
|
||||
if mark {
|
||||
continue
|
||||
}
|
||||
|
||||
err = s.walkObject(c, walked, func(c cid.Cid) error {
|
||||
for c := range txnRefs {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
return errStopWalk
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
// we have to do a deep walk here, as an early mark would stick even if there are
|
||||
// missing references that haven't been written yet!
|
||||
err = s.walkObjectDeep(c, walked,
|
||||
func(c cid.Cid) error {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||
log.Warnf("missing or incomplete object: %s", c)
|
||||
missing = append(missing, c)
|
||||
} else {
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
// we also short-circuit on old blocks, as these can come from a network request
|
||||
// and cause us to fail because we have purged its consistituents (or missing from
|
||||
// the beginning in case of snapshot sync, e.g. parent message receipts or old messages)
|
||||
// if these blocks are on our chain, they would have been marked but they might be
|
||||
// from a fork.
|
||||
//
|
||||
// Ideally, we would have API options to preclude us from trcking references to such
|
||||
// objects, but we don't so we have to do this check
|
||||
isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||
log.Warnf("missing or incomplete object: %s", c)
|
||||
missing = append(missing, c)
|
||||
} else {
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||
|
||||
// 1.2 rescan for missing objects (after waiting a minute), as they might have not been copied yet
|
||||
// by the vm
|
||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||
}
|
||||
|
||||
// 1.2 rescan for missing objects (after waiting a bit), as they might have not been copied yet
|
||||
// by the vm at the time of the update walk.
|
||||
if len(missing) > 0 {
|
||||
try := 0
|
||||
|
||||
log.Info("rescanning for missing objects")
|
||||
startMark = time.Now()
|
||||
count = 0
|
||||
|
||||
for len(missing) > 0 {
|
||||
if try > maxMissingScanRetries {
|
||||
return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects", len(missing))
|
||||
return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects after %d attempts", len(missing), try)
|
||||
}
|
||||
|
||||
// discard previous walk short-cuts
|
||||
walked = cid.NewSet()
|
||||
walked := cid.NewSet()
|
||||
towalk := missing
|
||||
missing = nil
|
||||
try++
|
||||
@ -903,44 +947,34 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
time.Sleep(time.Minute)
|
||||
|
||||
for _, c := range towalk {
|
||||
// we can't reliably check the markset and short-circuit this time, we have to do full walks
|
||||
// because the object was previously visited top-to-bottom, with root DAGs short circuiting
|
||||
// their children.
|
||||
// but we *can* short-circuit on the txn protection filter, as this implies that the object
|
||||
// will be protected from purge.
|
||||
err = s.walkObject(c, walked, func(c cid.Cid) error {
|
||||
mark, err := s.txnProtect.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking protected set for %s: %w", c, err)
|
||||
}
|
||||
// deep walk here again, as we are concerned about internal references not having been written
|
||||
err = s.walkObjectDeep(c, walked,
|
||||
func(c cid.Cid) error {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
// mark it
|
||||
err = markSet.Mark(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// see comment above for this check
|
||||
isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
// we also short-circuit in case of a block header, as it may cause us to walk the
|
||||
// entire chain because of a network request (and fail if we were synced form a snapshot
|
||||
// because of missing messages or receipts!)
|
||||
// this is necessary because we don't have interface options to signal network request
|
||||
// initiated API calls; when we have that, we can stop tracking those references and
|
||||
// we can remove this check.
|
||||
isBlock, err := s.isBlockHeader(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %W", c, err)
|
||||
}
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
if isBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||
@ -953,7 +987,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("rescanning done", "took", time.Since(startMark))
|
||||
log.Infow("rescanning done", "took", time.Since(startMark), "marked", count)
|
||||
}
|
||||
|
||||
// 2. iterate through the hotstore to collect cold objects
|
||||
@ -1112,20 +1146,6 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) isBlockHeader(c cid.Cid) (isBlock bool, err error) {
|
||||
if c.Prefix().Codec != cid.DagCBOR {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err = s.view(c, func(data []byte) error {
|
||||
var hdr types.BlockHeader
|
||||
isBlock = hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil
|
||||
return nil
|
||||
})
|
||||
|
||||
return isBlock, err
|
||||
}
|
||||
|
||||
func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
|
||||
if !walked.Visit(c) {
|
||||
return nil
|
||||
@ -1164,6 +1184,47 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// like walkObject, but it visits leaves first, with pre invoked at the parent node to control
|
||||
// whether the walk should stop
|
||||
func (s *SplitStore) walkObjectDeep(c cid.Cid, walked *cid.Set,
|
||||
pre func(cid.Cid) error, f func(cid.Cid) error) error {
|
||||
if !walked.Visit(c) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := pre(c); err != nil {
|
||||
if err == errStopWalk {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Prefix().Codec != cid.DagCBOR {
|
||||
return f(c)
|
||||
}
|
||||
|
||||
var links []cid.Cid
|
||||
err := s.view(c, func(data []byte) error {
|
||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||
links = append(links, c)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
for _, c := range links {
|
||||
err := s.walkObjectDeep(c, walked, pre, f)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||
}
|
||||
}
|
||||
|
||||
return f(c)
|
||||
}
|
||||
|
||||
// internal version used by walk
|
||||
func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error {
|
||||
err := s.hot.View(cid, cb)
|
||||
@ -1176,6 +1237,22 @@ func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlock bool, err error) {
|
||||
if c.Prefix().Codec != cid.DagCBOR {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err = s.view(c, func(data []byte) error {
|
||||
var hdr types.BlockHeader
|
||||
if hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil {
|
||||
isOldBlock = hdr.Height < epoch
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return isOldBlock, err
|
||||
}
|
||||
|
||||
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
||||
batch := make([]blocks.Block, 0, batchSize)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user