deal with incomplete objects that need to be marked and protected
seems that something is writing DAGs before its consituents, which causes problems.
This commit is contained in:
parent
db53859e7a
commit
1726eb993c
@ -148,6 +148,7 @@ type SplitStore struct {
|
|||||||
txnEnv MarkSetEnv
|
txnEnv MarkSetEnv
|
||||||
txnProtect MarkSet
|
txnProtect MarkSet
|
||||||
txnMarkSet MarkSet
|
txnMarkSet MarkSet
|
||||||
|
txnMissing map[cid.Cid]struct{}
|
||||||
txnRefsMx sync.Mutex
|
txnRefsMx sync.Mutex
|
||||||
txnRefs map[cid.Cid]struct{}
|
txnRefs map[cid.Cid]struct{}
|
||||||
}
|
}
|
||||||
@ -629,15 +630,12 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
|||||||
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
||||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||||
// cannot be deleted before the object itself.
|
// cannot be deleted before the object itself.
|
||||||
// so we just do a regular walk and mark in the protected set.
|
err := s.walkObjectIncomplete(root, cid.NewSet(),
|
||||||
err := s.walkObject(root, cid.NewSet(),
|
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if c != root {
|
if c != root {
|
||||||
_, ok := batch[c]
|
_, ok := batch[c]
|
||||||
if ok {
|
if ok {
|
||||||
// it's on the same batch, stop walk
|
// it's on the same batch, stop walk
|
||||||
// this check is necessary as the object may contain references to objects
|
|
||||||
// in the same batch (yet to be written) that cannot be loaded for the walk
|
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -673,6 +671,11 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
return s.txnProtect.Mark(c)
|
return s.txnProtect.Mark(c)
|
||||||
|
},
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
log.Warnf("missing object %s in %s", c, root)
|
||||||
|
s.txnMissing[c] = struct{}{}
|
||||||
|
return errStopWalk
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -843,6 +846,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
||||||
}
|
}
|
||||||
s.txnMarkSet = markSet
|
s.txnMarkSet = markSet
|
||||||
|
s.txnMissing = make(map[cid.Cid]struct{})
|
||||||
s.txnLk.Unlock()
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -851,10 +855,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
s.txnActive = false
|
s.txnActive = false
|
||||||
s.txnProtect = nil
|
s.txnProtect = nil
|
||||||
s.txnMarkSet = nil
|
s.txnMarkSet = nil
|
||||||
|
s.txnMissing = nil
|
||||||
s.txnLk.Unlock()
|
s.txnLk.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 1.1 Update markset for references created during marking
|
// 1.1 Update markset for references created during marking
|
||||||
|
missing := make(map[cid.Cid]struct{})
|
||||||
if len(txnRefs) > 0 {
|
if len(txnRefs) > 0 {
|
||||||
log.Info("updating mark set for live references", "refs", len(txnRefs))
|
log.Info("updating mark set for live references", "refs", len(txnRefs))
|
||||||
startMark = time.Now()
|
startMark = time.Now()
|
||||||
@ -871,7 +877,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.walkObject(c, walked,
|
err = s.walkObjectIncomplete(c, walked,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
mark, err := markSet.Has(c)
|
mark, err := markSet.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -901,6 +907,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
count++
|
count++
|
||||||
return markSet.Mark(c)
|
return markSet.Mark(c)
|
||||||
|
},
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
missing[c] = struct{}{}
|
||||||
|
return errStopWalk
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -908,7 +918,76 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count)
|
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.2 if there were missing objects (presumably because they haven't been written yet),
|
||||||
|
// wait for them to be written and retry marking
|
||||||
|
if len(missing) > 0 {
|
||||||
|
log.Info("marking for missing objects")
|
||||||
|
startMark = time.Now()
|
||||||
|
try := 0
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
for len(missing) > 0 {
|
||||||
|
if try >= 5 {
|
||||||
|
return xerrors.Errorf("missing %d objects after %d attempts; giving up", len(missing), try)
|
||||||
|
}
|
||||||
|
try++
|
||||||
|
|
||||||
|
// wait a bit
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
log.Infow("marking missing objects", "attempt", try, "missing", len(missing), "marked", count)
|
||||||
|
|
||||||
|
towalk := missing
|
||||||
|
missing = make(map[cid.Cid]struct{})
|
||||||
|
walked := cid.NewSet()
|
||||||
|
|
||||||
|
for c := range towalk {
|
||||||
|
mark, err := markSet.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mark {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.walkObjectIncomplete(c, walked,
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
mark, err := markSet.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mark {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if isOldBlock {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
return markSet.Mark(c)
|
||||||
|
},
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
missing[c] = struct{}{}
|
||||||
|
return errStopWalk
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("marking for missing objects done", "took", time.Since(startMark), "attempts", try, "marked", count)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. iterate through the hotstore to collect cold objects
|
// 2. iterate through the hotstore to collect cold objects
|
||||||
@ -1115,15 +1194,28 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// like walkObject, but it visits leaves first, with pre invoked at the parent node to control
|
// like walkObject, but the object may be potentially incomplete (references missing from the hotstore)
|
||||||
// whether the walk should stop
|
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
|
||||||
func (s *SplitStore) walkObjectDeep(c cid.Cid, walked *cid.Set,
|
|
||||||
pre func(cid.Cid) error, f func(cid.Cid) error) error {
|
|
||||||
if !walked.Visit(c) {
|
if !walked.Visit(c) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pre(c); err != nil {
|
// occurs check
|
||||||
|
has, err := s.hot.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error occur checking %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !has {
|
||||||
|
err = missing(c)
|
||||||
|
if err == errStopWalk {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f(c); err != nil {
|
||||||
if err == errStopWalk {
|
if err == errStopWalk {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1132,11 +1224,11 @@ func (s *SplitStore) walkObjectDeep(c cid.Cid, walked *cid.Set,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
return f(c)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var links []cid.Cid
|
var links []cid.Cid
|
||||||
err := s.view(c, func(data []byte) error {
|
err = s.view(c, func(data []byte) error {
|
||||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||||
links = append(links, c)
|
links = append(links, c)
|
||||||
})
|
})
|
||||||
@ -1147,13 +1239,13 @@ func (s *SplitStore) walkObjectDeep(c cid.Cid, walked *cid.Set,
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range links {
|
for _, c := range links {
|
||||||
err := s.walkObjectDeep(c, walked, pre, f)
|
err := s.walkObjectIncomplete(c, walked, f, missing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return f(c)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// internal version used by walk
|
// internal version used by walk
|
||||||
@ -1263,7 +1355,8 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't delete one giant batch of 7M objects, but rather do smaller batches
|
// we don't delete one giant batch of millions of objects, but rather do smaller batches
|
||||||
|
// so that we don't stop the world for an extended period of time
|
||||||
done := false
|
done := false
|
||||||
for i := 0; !done; i++ {
|
for i := 0; !done; i++ {
|
||||||
start := i * batchSize
|
start := i * batchSize
|
||||||
@ -1289,11 +1382,52 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
|||||||
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
protectMissing := func(missing map[cid.Cid]struct{}) error {
|
||||||
|
s.txnLk.RLock()
|
||||||
|
defer s.txnLk.RUnlock()
|
||||||
|
|
||||||
|
for c := range missing {
|
||||||
|
err := s.doTxnProtect(c, missing)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return s.purgeBatch(cids,
|
return s.purgeBatch(cids,
|
||||||
func(cids []cid.Cid) error {
|
func(cids []cid.Cid) error {
|
||||||
deadCids := deadCids[:0]
|
deadCids := deadCids[:0]
|
||||||
|
|
||||||
|
// ideally this would be just s.txnLk.Lock() and defer s.txnLk.Unlock(), but we have to
|
||||||
|
// deal with incomplete object protection
|
||||||
|
try := 0
|
||||||
|
again:
|
||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
|
if len(s.txnMissing) > 0 {
|
||||||
|
if try >= 5 {
|
||||||
|
count := len(s.txnMissing)
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
return xerrors.Errorf("error purging: missing %d objects after %d attempts; giving up", count, try)
|
||||||
|
}
|
||||||
|
|
||||||
|
try++
|
||||||
|
log.Infof("delaying purge; missing %d protected objects (attempt: %d)", len(s.txnMissing))
|
||||||
|
missing := s.txnMissing
|
||||||
|
s.txnMissing = make(map[cid.Cid]struct{})
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
|
if try > 1 {
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
}
|
||||||
|
err := protectMissing(missing)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("purge error: error protecting missing objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
goto again
|
||||||
|
}
|
||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
for _, c := range cids {
|
for _, c := range cids {
|
||||||
|
Loading…
Reference in New Issue
Block a user