fix some residual purge races
This commit is contained in:
parent
68bc5d2291
commit
e4bb4be855
@ -236,17 +236,19 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if has {
|
if has {
|
||||||
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
|
// treat it as an implicit (recursive) Write, absence options -- the vm uses this check to avoid
|
||||||
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
|
// duplicate writes on Copy.
|
||||||
// an implicit Write.
|
// When we have options in the API, the vm can explicitly signal that this is an implicit Write.
|
||||||
s.trackWrite(c, true)
|
s.trackWrite(c, true)
|
||||||
|
|
||||||
// also make sure the object is considered live during compaction in case we have already
|
// also make sure the object is considered live during compaction in case we have already
|
||||||
// flushed pending writes and started compaction
|
// flushed pending writes and started compaction.
|
||||||
|
// in case of a race with purge, this will return a track error, which we can use to
|
||||||
|
// signal to the vm that the object is not fully present.
|
||||||
trackErr := s.trackTxnRef(c, true)
|
trackErr := s.trackTxnRef(c, true)
|
||||||
|
|
||||||
// if we failed to track the object and all its dependencies, then return false so as
|
// if we failed to track the object and all its dependencies, then return false so as
|
||||||
// to cause the vm to recompute
|
// to cause the vm to copy
|
||||||
return trackErr == nil, nil
|
return trackErr == nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -673,6 +675,16 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) error {
|
|||||||
err = s.txnProtect.Mark(c)
|
err = s.txnProtect.Mark(c)
|
||||||
} else {
|
} else {
|
||||||
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
||||||
|
// this check is necessary to avoid races because objects are purged in random order
|
||||||
|
has, err := s.hot.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !has {
|
||||||
|
return xerrors.Errorf("object (cid: %s) has been purged", c)
|
||||||
|
}
|
||||||
|
|
||||||
return s.txnProtect.Mark(c)
|
return s.txnProtect.Mark(c)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -949,28 +961,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error creating mark set: %w", err)
|
return xerrors.Errorf("error creating mark set: %w", err)
|
||||||
}
|
}
|
||||||
defer markSet.Close() //nolint:errcheck
|
defer markSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
// create the pruge protect filter
|
|
||||||
s.txnLk.Lock()
|
|
||||||
s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize)
|
|
||||||
if err != nil {
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
|
||||||
}
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
s.txnLk.Lock()
|
|
||||||
_ = s.txnProtect.Close()
|
|
||||||
s.txnProtect = nil
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
defer s.debug.Flush()
|
defer s.debug.Flush()
|
||||||
|
|
||||||
// flush pending writes to update the tracker
|
|
||||||
s.flushPendingWrites(false)
|
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
||||||
log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)
|
log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
@ -992,6 +984,28 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
||||||
|
|
||||||
|
// create the transaction protect filter
|
||||||
|
s.txnLk.Lock()
|
||||||
|
s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize)
|
||||||
|
if err != nil {
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
||||||
|
}
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
s.txnLk.Lock()
|
||||||
|
_ = s.txnProtect.Close()
|
||||||
|
s.txnProtect = nil
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// flush pending writes to update the tracker
|
||||||
|
log.Info("flushing pending writes")
|
||||||
|
startFlush := time.Now()
|
||||||
|
s.flushPendingWrites(false)
|
||||||
|
log.Infow("flushing done", "took", time.Since(startFlush))
|
||||||
|
|
||||||
// 2. move cold unreachable objects to the coldstore
|
// 2. move cold unreachable objects to the coldstore
|
||||||
log.Info("collecting cold objects")
|
log.Info("collecting cold objects")
|
||||||
startCollect := time.Now()
|
startCollect := time.Now()
|
||||||
@ -1071,6 +1085,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
log.Infow("moving done", "took", time.Since(startMove))
|
log.Infow("moving done", "took", time.Since(startMove))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.3 purge cold objects from the hotstore
|
// 2.3 purge cold objects from the hotstore
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
@ -1293,7 +1308,7 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
|||||||
deadCids := make([]cid.Cid, 0, batchSize)
|
deadCids := make([]cid.Cid, 0, batchSize)
|
||||||
var purgeCnt, liveCnt int
|
var purgeCnt, liveCnt int
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("purged objects", "purged", purgeCnt, "live", liveCnt)
|
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return s.purgeBatch(cids,
|
return s.purgeBatch(cids,
|
||||||
|
Loading…
Reference in New Issue
Block a user