reinstate some better code for handling missing references.

This commit is contained in:
vyzo 2021-07-05 16:08:08 +03:00
parent fa195bede2
commit 59639a0788

View File

@ -150,6 +150,7 @@ type SplitStore struct {
txnMarkSet MarkSet txnMarkSet MarkSet
txnRefsMx sync.Mutex txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{} txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
} }
var _ bstore.Blockstore = (*SplitStore)(nil) var _ bstore.Blockstore = (*SplitStore)(nil)
@ -677,6 +678,11 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
}, },
func(c cid.Cid) error { func(c cid.Cid) error {
log.Warnf("missing object %s in %s", c, root) log.Warnf("missing object %s in %s", c, root)
if s.txnMissing != nil {
s.txnRefsMx.Lock()
s.txnMissing[c] = struct{}{}
s.txnRefsMx.Unlock()
}
return errStopWalk return errStopWalk
}) })
@ -845,6 +851,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
s.txnLk.Lock() s.txnLk.Lock()
txnRefs := s.txnRefs txnRefs := s.txnRefs
s.txnRefs = nil s.txnRefs = nil
s.txnMissing = make(map[cid.Cid]struct{})
s.txnProtect, err = s.txnEnv.Create("protected", 0) s.txnProtect, err = s.txnEnv.Create("protected", 0)
if err != nil { if err != nil {
s.txnLk.Unlock() s.txnLk.Unlock()
@ -859,10 +866,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
s.txnActive = false s.txnActive = false
s.txnProtect = nil s.txnProtect = nil
s.txnMarkSet = nil s.txnMarkSet = nil
s.txnMissing = nil
s.txnLk.Unlock() s.txnLk.Unlock()
}() }()
// 1.1 Update markset for references created during marking // 1.1 Update markset for references created during marking
missing := make(map[cid.Cid]struct{})
if len(txnRefs) > 0 { if len(txnRefs) > 0 {
log.Infow("updating mark set for live references", "refs", len(txnRefs)) log.Infow("updating mark set for live references", "refs", len(txnRefs))
startMark = time.Now() startMark = time.Now()
@ -920,6 +929,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}, },
func(c cid.Cid) error { func(c cid.Cid) error {
log.Warnf("missing object for marking: %s", c) log.Warnf("missing object for marking: %s", c)
missing[c] = struct{}{}
return errStopWalk return errStopWalk
}) })
@ -928,7 +938,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
} }
} }
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count) log.Infow("update mark set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
}
// 1.2 if there are missing references wait a bit for them to see if they are written later
if len(missing) > 0 {
s.waitForMissingRefs(missing, markSet, nil)
} }
// 2. iterate through the hotstore to collect cold objects // 2. iterate through the hotstore to collect cold objects
@ -962,7 +977,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error collecting candidate cold objects: %w", err) return xerrors.Errorf("error collecting candidate cold objects: %w", err)
} }
log.Infow("candidate collection done", "took", time.Since(startCollect)) log.Infow("cold collection done", "took", time.Since(startCollect))
if coldCnt > 0 { if coldCnt > 0 {
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
@ -972,6 +987,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
// now that we have collected cold objects, check for missing references from transactional i/o
// and disable further collection of such references (they will not be acted upon)
s.txnLk.Lock()
missing = s.txnMissing
s.txnMissing = nil
s.txnLk.Unlock()
if len(missing) > 0 {
s.waitForMissingRefs(missing, s.txnProtect, markSet)
}
// 3. copy the cold objects to the coldstore -- if we have one // 3. copy the cold objects to the coldstore -- if we have one
if !s.cfg.DiscardColdBlocks { if !s.cfg.DiscardColdBlocks {
log.Info("moving cold objects to the coldstore") log.Info("moving cold objects to the coldstore")
@ -1367,6 +1393,86 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
}) })
} }
// I really don't like having this code, but we seem to have some DAG references with missing
// constituents. During testing in mainnet *some* of these references *sometimes* appeared after a
// little bit.
// We need to figure out where they are coming from and eliminate that vector, but until then we
// have this gem[TM].
func (s *SplitStore) waitForMissingRefs(missing map[cid.Cid]struct{}, markSet, ctlSet MarkSet) {
log.Info("waiting for missing references")
start := time.Now()
count := 0
defer func() {
log.Infow("waiting for missing references done", "took", time.Since(start), "marked", count)
}()
for i := 1; i <= 3 && len(missing) > 0; i++ {
wait := time.Duration(i) * time.Minute
log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i)
time.Sleep(wait)
towalk := missing
walked := cid.NewSet()
missing = make(map[cid.Cid]struct{})
for c := range towalk {
err := s.walkObjectIncomplete(c, walked,
func(c cid.Cid) error {
if isFilCommitment(c) {
return errStopWalk
}
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
if mark {
return errStopWalk
}
if ctlSet != nil {
mark, err = ctlSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
if mark {
return errStopWalk
}
}
isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch)
if err != nil {
return xerrors.Errorf("error checking object type for %s: %w", c, err)
}
if isOldBlock {
return errStopWalk
}
count++
return markSet.Mark(c)
},
func(c cid.Cid) error {
missing[c] = struct{}{}
return errStopWalk
})
if err != nil {
log.Warnf("error marking: %s", err)
}
}
}
if len(missing) > 0 {
log.Warnf("still missing %d references", len(missing))
for c := range missing {
log.Warnf("unresolved missing reference: %s", c)
}
}
}
func (s *SplitStore) gcHotstore() { func (s *SplitStore) gcHotstore() {
if compact, ok := s.hot.(interface{ Compact() error }); ok { if compact, ok := s.hot.(interface{ Compact() error }); ok {
log.Infof("compacting hotstore") log.Infof("compacting hotstore")