simplify transactional protection logic
Now that we delete objects heaviest first, we don't have to do deep walk and rescan gymnastics.
This commit is contained in:
parent
40c271cda1
commit
f33d4e79aa
@ -77,8 +77,6 @@ var (
|
||||
|
||||
// used to signal end of walk
|
||||
errStopWalk = errors.New("stop walk")
|
||||
// used to signal a missing object when protecting recursive references
|
||||
errMissingObject = errors.New("missing object")
|
||||
|
||||
// set this to true if you are debugging the splitstore to enable debug logging
|
||||
enableDebugLog = false
|
||||
@ -226,17 +224,7 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
|
||||
}
|
||||
|
||||
if has {
|
||||
// treat it as an implicit (recursive) Write, when it is within vm.Copy context.
|
||||
// -- the vm uses this check to avoid duplicate writes on Copy.
|
||||
// When we have options in the API (or something better), the vm can explicitly signal
|
||||
// that this is an implicit Write.
|
||||
err = s.trackTxnRef(c, true)
|
||||
if xerrors.Is(err, errMissingObject) {
|
||||
// we failed to recursively protect the object because some inner object has been purged;
|
||||
// signal to the VM to copy.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err = s.trackTxnRef(c)
|
||||
return true, err
|
||||
}
|
||||
|
||||
@ -251,7 +239,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
err = s.trackTxnRef(cid, false)
|
||||
err = s.trackTxnRef(cid)
|
||||
return blk, err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
@ -285,7 +273,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
err = s.trackTxnRef(cid, false)
|
||||
err = s.trackTxnRef(cid)
|
||||
return size, err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
@ -323,7 +311,7 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
||||
s.mx.Unlock()
|
||||
s.debug.LogWrite(curTs, blk, writeEpoch)
|
||||
}
|
||||
err = s.trackTxnRef(blk.Cid(), false)
|
||||
err = s.trackTxnRef(blk.Cid())
|
||||
}
|
||||
|
||||
return err
|
||||
@ -400,7 +388,7 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
|
||||
err := s.hot.View(cid, cb)
|
||||
switch err {
|
||||
case nil:
|
||||
err = s.trackTxnRef(cid, false)
|
||||
err = s.trackTxnRef(cid)
|
||||
return err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
@ -590,7 +578,7 @@ func (s *SplitStore) updateWriteEpoch() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error {
|
||||
func (s *SplitStore) trackTxnRef(c cid.Cid) error {
|
||||
if !s.txnActive {
|
||||
// not compacting
|
||||
return nil
|
||||
@ -605,11 +593,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error {
|
||||
}
|
||||
|
||||
// we have finished marking, protect the reference
|
||||
if !deep {
|
||||
return s.doTxnProtect(c, nil)
|
||||
}
|
||||
|
||||
return s.doTxnProtectDeep(c)
|
||||
return s.doTxnProtect(c, nil)
|
||||
}
|
||||
|
||||
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||
@ -645,8 +629,10 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||
}
|
||||
|
||||
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
||||
// it's a shallow reference, protect with a standard walk without occur checking
|
||||
return s.walkObject(root, cid.NewSet(),
|
||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||
// cannot be deleted before the object itself.
|
||||
// so we just do a regular walk and mark in the protected set.
|
||||
err := s.walkObject(root, cid.NewSet(),
|
||||
func(c cid.Cid) error {
|
||||
if c != root {
|
||||
_, ok := batch[c]
|
||||
@ -690,58 +676,12 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
|
||||
|
||||
return s.txnProtect.Mark(c)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SplitStore) doTxnProtectDeep(root cid.Cid) error {
|
||||
// it's a deep reference potentially in vm context
|
||||
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
||||
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
||||
// got recreated by the VM.
|
||||
return s.walkObjectDeep(root, cid.NewSet(),
|
||||
func(c cid.Cid) error {
|
||||
mark, err := s.txnMarkSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
if err != nil {
|
||||
log.Warnf("error protecting object (cid: %s): %s", root, err)
|
||||
}
|
||||
|
||||
// it's marked, nothing to do
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
// old block reference -- see comment in doCompact about the necessity of this
|
||||
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
// this occurs check is necessary because cold objects are purged in arbitrary order
|
||||
has, err := s.hot.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking hotstore for %s: %w", c, err)
|
||||
}
|
||||
|
||||
// it's not there (might have been deleted), signal to the vm to copy
|
||||
if !has {
|
||||
log.Warnf("missing object for recursive reference to %s", c)
|
||||
return errMissingObject
|
||||
}
|
||||
|
||||
// mark it in *both* sets, so that we can short-circuit a concurrent walk.
|
||||
err = s.txnMarkSet.Mark(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error marking %s: %w", c, err)
|
||||
}
|
||||
|
||||
return s.txnProtect.Mark(c)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||
@ -917,7 +857,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}()
|
||||
|
||||
// 1.1 Update markset for references created during marking
|
||||
var missing []cid.Cid
|
||||
if len(txnRefs) > 0 {
|
||||
log.Info("updating mark set for live references", "refs", len(txnRefs))
|
||||
startMark = time.Now()
|
||||
@ -934,9 +873,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// we have to do a deep walk here, as an early mark would stick even if there are
|
||||
// missing references that haven't been written yet!
|
||||
err = s.walkObjectDeep(c, walked,
|
||||
err = s.walkObject(c, walked,
|
||||
func(c cid.Cid) error {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
@ -964,92 +901,16 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||
log.Warnf("missing or incomplete object: %s", c)
|
||||
missing = append(missing, c)
|
||||
} else {
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
}
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||
}
|
||||
|
||||
// 1.2 rescan for missing objects (after waiting a bit), as they might have not been copied yet
|
||||
// by the vm at the time of the update walk.
|
||||
if len(missing) > 0 {
|
||||
try := 0
|
||||
|
||||
log.Info("rescanning for missing objects")
|
||||
startMark = time.Now()
|
||||
count = 0
|
||||
|
||||
for len(missing) > 0 {
|
||||
if try > maxMissingScanRetries {
|
||||
return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects after %d attempts", len(missing), try)
|
||||
}
|
||||
|
||||
// discard previous walk short-cuts
|
||||
walked := cid.NewSet()
|
||||
towalk := missing
|
||||
missing = nil
|
||||
try++
|
||||
|
||||
log.Infof("rescanning for %d missing objects (attempt %d)", len(towalk), try)
|
||||
// wait a minute first for in-flight writes to complete
|
||||
time.Sleep(time.Minute)
|
||||
|
||||
for _, c := range towalk {
|
||||
// deep walk here again, as we are concerned about internal references not having been written
|
||||
err = s.walkObjectDeep(c, walked,
|
||||
func(c cid.Cid) error {
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
// see comment above for this check
|
||||
isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||
log.Warnf("missing or incomplete object: %s", c)
|
||||
missing = append(missing, c)
|
||||
} else {
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("rescanning done", "took", time.Since(startMark), "marked", count)
|
||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count)
|
||||
}
|
||||
|
||||
// 2. iterate through the hotstore to collect cold objects
|
||||
|
Loading…
Reference in New Issue
Block a user