get rid of ugly missing reference handling code
those missing objects don't seem to ever get there, are they from an abandoned fork?
This commit is contained in:
parent
59936ef468
commit
fa195bede2
@ -148,7 +148,6 @@ type SplitStore struct {
|
||||
txnEnv MarkSetEnv
|
||||
txnProtect MarkSet
|
||||
txnMarkSet MarkSet
|
||||
txnMissing map[cid.Cid]struct{}
|
||||
txnRefsMx sync.Mutex
|
||||
txnRefs map[cid.Cid]struct{}
|
||||
}
|
||||
@ -630,8 +629,6 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||
// cannot be deleted before the object itself.
|
||||
// Note on this missing business: it appears that some DAGs can be written before their
|
||||
// consituents. THIS NEEDS TO BE FIXED -- but until then we do this missing dance business
|
||||
err := s.walkObjectIncomplete(root, cid.NewSet(),
|
||||
func(c cid.Cid) error {
|
||||
if isFilCommitment(c) {
|
||||
@ -680,7 +677,6 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
log.Warnf("missing object %s in %s", c, root)
|
||||
s.txnMissing[c] = struct{}{}
|
||||
return errStopWalk
|
||||
})
|
||||
|
||||
@ -855,7 +851,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
||||
}
|
||||
s.txnMarkSet = markSet
|
||||
s.txnMissing = make(map[cid.Cid]struct{})
|
||||
s.txnLk.Unlock()
|
||||
|
||||
defer func() {
|
||||
@ -864,14 +859,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
s.txnActive = false
|
||||
s.txnProtect = nil
|
||||
s.txnMarkSet = nil
|
||||
s.txnMissing = nil
|
||||
s.txnLk.Unlock()
|
||||
}()
|
||||
|
||||
// 1.1 Update markset for references created during marking
|
||||
// Note on this missing business: it appears that some DAGs can be written before their
|
||||
// consituents. THIS NEEDS TO BE FIXED -- but until then we do this missing dance business
|
||||
missing := make(map[cid.Cid]struct{})
|
||||
if len(txnRefs) > 0 {
|
||||
log.Infow("updating mark set for live references", "refs", len(txnRefs))
|
||||
startMark = time.Now()
|
||||
@ -929,7 +920,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
log.Warnf("missing object for marking: %s", c)
|
||||
missing[c] = struct{}{}
|
||||
return errStopWalk
|
||||
})
|
||||
|
||||
@ -938,85 +928,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||
}
|
||||
|
||||
// 1.2 if there were missing objects (presumably because they haven't been written yet),
|
||||
// wait for them to be written and retry marking
|
||||
if len(missing) > 0 {
|
||||
log.Info("marking for missing objects")
|
||||
startMark = time.Now()
|
||||
try := 0
|
||||
count = 0
|
||||
|
||||
for len(missing) > 0 {
|
||||
if try >= 100 {
|
||||
return xerrors.Errorf("missing %d objects after %d attempts; giving up", len(missing), try)
|
||||
}
|
||||
try++
|
||||
|
||||
// wait a bit
|
||||
time.Sleep(time.Minute)
|
||||
log.Infow("marking missing objects", "attempt", try, "missing", len(missing), "marked", count)
|
||||
|
||||
towalk := missing
|
||||
missing = make(map[cid.Cid]struct{})
|
||||
walked := cid.NewSet()
|
||||
|
||||
for c := range towalk {
|
||||
if isFilCommitment(c) {
|
||||
continue
|
||||
}
|
||||
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
continue
|
||||
}
|
||||
|
||||
err = s.walkObjectIncomplete(c, walked,
|
||||
func(c cid.Cid) error {
|
||||
if isFilCommitment(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking object type for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if isOldBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
log.Warnf("missing object for marking: %s", c)
|
||||
missing[c] = struct{}{}
|
||||
return errStopWalk
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("marking for missing objects done", "took", time.Since(startMark), "attempts", try, "marked", count)
|
||||
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count)
|
||||
}
|
||||
|
||||
// 2. iterate through the hotstore to collect cold objects
|
||||
@ -1423,52 +1335,11 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
||||
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
|
||||
}()
|
||||
|
||||
// Note on this missing business: it appears that some DAGs can be written before their
|
||||
// consituents. THIS NEEDS TO BE FIXED -- but until then we do this missing dance business
|
||||
protectMissing := func(missing map[cid.Cid]struct{}) error {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
for c := range missing {
|
||||
err := s.doTxnProtect(c, missing)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.purgeBatch(cids,
|
||||
func(cids []cid.Cid) error {
|
||||
deadCids := deadCids[:0]
|
||||
|
||||
// ideally this would be just s.txnLk.Lock() and defer s.txnLk.Unlock(), but we have to
|
||||
// deal with incomplete object protection
|
||||
try := 0
|
||||
again:
|
||||
s.txnLk.Lock()
|
||||
if len(s.txnMissing) > 0 {
|
||||
if try >= 100 {
|
||||
count := len(s.txnMissing)
|
||||
s.txnLk.Unlock()
|
||||
return xerrors.Errorf("error purging: missing %d objects after %d attempts; giving up", count, try)
|
||||
}
|
||||
|
||||
try++
|
||||
log.Infof("delaying purge; missing %d protected objects (attempt: %d)", len(s.txnMissing))
|
||||
missing := s.txnMissing
|
||||
s.txnMissing = make(map[cid.Cid]struct{})
|
||||
s.txnLk.Unlock()
|
||||
|
||||
time.Sleep(time.Minute)
|
||||
err := protectMissing(missing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("purge error: error protecting missing objects: %w", err)
|
||||
}
|
||||
|
||||
goto again
|
||||
}
|
||||
defer s.txnLk.Unlock()
|
||||
|
||||
for _, c := range cids {
|
||||
|
Loading…
Reference in New Issue
Block a user