reinstante waitForMissingRefs
This commit is contained in:
parent
7931f1f8f9
commit
7b8447a95a
@ -521,6 +521,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// now that we have collected cold objects, check for missing references from transactional i/o
|
||||
// and disable further collection of such references (they will not be acted upon as we can't
|
||||
// possibly delete objects we didn't have when we were collecting cold objects)
|
||||
s.waitForMissingRefs(markSet)
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
coldr, err := NewColdSetReader(s.coldSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening coldset: %w", err)
|
||||
@ -1180,3 +1189,80 @@ func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint,
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// I really don't like having this code, but we seem to have some occasional DAG references with
|
||||
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
|
||||
// after a little bit.
|
||||
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
||||
// have this gem[TM].
|
||||
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
|
||||
// thinks the cause may be block validation.
|
||||
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
|
||||
s.txnLk.Lock()
|
||||
missing := s.txnMissing
|
||||
s.txnMissing = nil
|
||||
s.txnLk.Unlock()
|
||||
|
||||
if len(missing) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("waiting for missing references")
|
||||
start := time.Now()
|
||||
count := 0
|
||||
defer func() {
|
||||
log.Infow("waiting for missing references done", "took", time.Since(start), "marked", count)
|
||||
}()
|
||||
|
||||
for i := 0; i < 3 && len(missing) > 0; i++ {
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
wait := time.Duration(i) * time.Minute
|
||||
log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i+1)
|
||||
if wait > 0 {
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
towalk := missing
|
||||
visitor := newTmpVisitor()
|
||||
missing = make(map[cid.Cid]struct{})
|
||||
|
||||
for c := range towalk {
|
||||
err := s.walkObjectIncomplete(c, visitor,
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
visit, err := markSet.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
count++
|
||||
return nil
|
||||
},
|
||||
func(c cid.Cid) error {
|
||||
missing[c] = struct{}{}
|
||||
return errStopWalk
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("error marking: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(missing) > 0 {
|
||||
log.Warnf("still missing %d references", len(missing))
|
||||
for c := range missing {
|
||||
log.Warnf("unresolved missing reference: %s", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user