handle all missing refs together
so that we wait 6min at most, not 12.
This commit is contained in:
parent
5a099b7d05
commit
af8cf712be
@ -677,7 +677,7 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
|
|||||||
return s.txnProtect.Mark(c)
|
return s.txnProtect.Mark(c)
|
||||||
},
|
},
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
log.Warnf("missing object %s in %s", c, root)
|
log.Warnf("missing object reference %s in %s", c, root)
|
||||||
if s.txnMissing != nil {
|
if s.txnMissing != nil {
|
||||||
s.txnRefsMx.Lock()
|
s.txnRefsMx.Lock()
|
||||||
s.txnMissing[c] = struct{}{}
|
s.txnMissing[c] = struct{}{}
|
||||||
@ -871,7 +871,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// 1.1 Update markset for references created during marking
|
// 1.1 Update markset for references created during marking
|
||||||
missing := make(map[cid.Cid]struct{})
|
|
||||||
if len(txnRefs) > 0 {
|
if len(txnRefs) > 0 {
|
||||||
log.Infow("updating mark set for live references", "refs", len(txnRefs))
|
log.Infow("updating mark set for live references", "refs", len(txnRefs))
|
||||||
startMark = time.Now()
|
startMark = time.Now()
|
||||||
@ -927,9 +926,11 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
count++
|
count++
|
||||||
return markSet.Mark(c)
|
return markSet.Mark(c)
|
||||||
},
|
},
|
||||||
func(c cid.Cid) error {
|
func(cm cid.Cid) error {
|
||||||
log.Warnf("missing object for marking: %s", c)
|
log.Warnf("missing object reference %s in %s", cm, c)
|
||||||
missing[c] = struct{}{}
|
s.txnRefsMx.Lock()
|
||||||
|
s.txnMissing[cm] = struct{}{}
|
||||||
|
s.txnRefsMx.Unlock()
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -938,12 +939,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count)
|
||||||
}
|
|
||||||
|
|
||||||
// 1.2 if there are missing references wait a bit for them to see if they are written later
|
|
||||||
if len(missing) > 0 {
|
|
||||||
s.waitForMissingRefs(missing, markSet, nil)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. iterate through the hotstore to collect cold objects
|
// 2. iterate through the hotstore to collect cold objects
|
||||||
@ -989,14 +985,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
// now that we have collected cold objects, check for missing references from transactional i/o
|
// now that we have collected cold objects, check for missing references from transactional i/o
|
||||||
// and disable further collection of such references (they will not be acted upon)
|
// and disable further collection of such references (they will not be acted upon)
|
||||||
s.txnLk.Lock()
|
s.waitForMissingRefs(markSet)
|
||||||
missing = s.txnMissing
|
|
||||||
s.txnMissing = nil
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
|
|
||||||
if len(missing) > 0 {
|
|
||||||
s.waitForMissingRefs(missing, s.txnProtect, markSet)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. copy the cold objects to the coldstore -- if we have one
|
// 3. copy the cold objects to the coldstore -- if we have one
|
||||||
if !s.cfg.DiscardColdBlocks {
|
if !s.cfg.DiscardColdBlocks {
|
||||||
@ -1399,7 +1388,16 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
|||||||
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
// We need to figure out where they are coming from and eliminate that vector, but until then we
|
||||||
// have this gem[TM].
|
// have this gem[TM].
|
||||||
// My best guess is that they are parent message receipts or yet to be computed state roots.
|
// My best guess is that they are parent message receipts or yet to be computed state roots.
|
||||||
func (s *SplitStore) waitForMissingRefs(missing map[cid.Cid]struct{}, markSet, ctlSet MarkSet) {
|
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
|
||||||
|
s.txnLk.Lock()
|
||||||
|
missing := s.txnMissing
|
||||||
|
s.txnMissing = nil
|
||||||
|
s.txnLk.Unlock()
|
||||||
|
|
||||||
|
if len(missing) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("waiting for missing references")
|
log.Info("waiting for missing references")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
count := 0
|
count := 0
|
||||||
@ -1432,8 +1430,7 @@ func (s *SplitStore) waitForMissingRefs(missing map[cid.Cid]struct{}, markSet, c
|
|||||||
return errStopWalk
|
return errStopWalk
|
||||||
}
|
}
|
||||||
|
|
||||||
if ctlSet != nil {
|
mark, err = s.txnProtect.Has(c)
|
||||||
mark, err = ctlSet.Has(c)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
return xerrors.Errorf("error checking markset for %s: %w", c, err)
|
||||||
}
|
}
|
||||||
@ -1441,7 +1438,6 @@ func (s *SplitStore) waitForMissingRefs(missing map[cid.Cid]struct{}, markSet, c
|
|||||||
if mark {
|
if mark {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
|
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1453,7 +1449,7 @@ func (s *SplitStore) waitForMissingRefs(missing map[cid.Cid]struct{}, markSet, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
count++
|
count++
|
||||||
return markSet.Mark(c)
|
return s.txnProtect.Mark(c)
|
||||||
},
|
},
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
missing[c] = struct{}{}
|
missing[c] = struct{}{}
|
||||||
|
Loading…
Reference in New Issue
Block a user