deal with partially written objects
This commit is contained in:
parent
0a1d7b3732
commit
19d1b1f532
@ -79,6 +79,8 @@ var (
|
|||||||
enableDebugLog = false
|
enableDebugLog = false
|
||||||
// set this to true if you want to track origin stack traces in the write log
|
// set this to true if you want to track origin stack traces in the write log
|
||||||
enableDebugLogWriteTraces = false
|
enableDebugLogWriteTraces = false
|
||||||
|
|
||||||
|
maxMissingScanRetries = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -633,7 +635,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
|
|||||||
return xerrors.Errorf("error checking hotstore for %s: %w", c, err)
|
return xerrors.Errorf("error checking hotstore for %s: %w", c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// it has been deleted, signal to the vm to copy
|
// it's not there (might have been deleted), signal to the vm to copy
|
||||||
if !has {
|
if !has {
|
||||||
log.Warnf("missing object for recursive reference to %s", c)
|
log.Warnf("missing object for recursive reference to %s", c)
|
||||||
return errMissingObject
|
return errMissingObject
|
||||||
@ -846,6 +848,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
startMark = time.Now()
|
startMark = time.Now()
|
||||||
walked := cid.NewSet()
|
walked := cid.NewSet()
|
||||||
count = 0
|
count = 0
|
||||||
|
var missing []cid.Cid
|
||||||
for c := range txnRefs {
|
for c := range txnRefs {
|
||||||
mark, err := markSet.Has(c)
|
mark, err := markSet.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -871,10 +874,71 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||||
|
log.Warnf("missing or incomplete object: %s", c)
|
||||||
|
missing = append(missing, c)
|
||||||
|
} else {
|
||||||
|
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count)
|
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing))
|
||||||
|
|
||||||
|
// 1.2 rescan for missing objects (after waiting a minute), as they might have not been copied yet
|
||||||
|
// by the vm
|
||||||
|
if len(missing) > 0 {
|
||||||
|
try := 0
|
||||||
|
|
||||||
|
log.Info("rescanning for missing objects")
|
||||||
|
startMark = time.Now()
|
||||||
|
|
||||||
|
for len(missing) > 0 {
|
||||||
|
if try > maxMissingScanRetries {
|
||||||
|
return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects", len(missing))
|
||||||
|
}
|
||||||
|
|
||||||
|
// discard previous walk short-cuts
|
||||||
|
walked = cid.NewSet()
|
||||||
|
towalk := missing
|
||||||
|
missing = nil
|
||||||
|
try++
|
||||||
|
|
||||||
|
log.Infof("rescanning for %d missing objects (attempt %d)", len(towalk), try)
|
||||||
|
// wait a minute first for in-flight writes to complete
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
|
||||||
|
for _, c := range towalk {
|
||||||
|
// we can't reliably check the markset and short-circuit this time, we have to do full walks
|
||||||
|
// because the object was previously visited top-to-bottom, with root DAGs short circuiting
|
||||||
|
// their children.
|
||||||
|
// but we *can* short-circuit on the txn protection filter, as this implies that the object
|
||||||
|
// will be protected from purge.
|
||||||
|
err = s.walkObject(c, walked, func(c cid.Cid) error {
|
||||||
|
mark, err := s.txnProtect.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error checking protected set for %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mark {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
return markSet.Mark(c)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if xerrors.Is(err, bstore.ErrNotFound) {
|
||||||
|
log.Warnf("missing or incomplete object: %s", c)
|
||||||
|
missing = append(missing, c)
|
||||||
|
} else {
|
||||||
|
return xerrors.Errorf("error walking %s for marking: %w", c, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("rescanning done", "took", time.Since(startMark))
|
||||||
|
}
|
||||||
|
|
||||||
// 2. iterate through the hotstore to collect cold objects
|
// 2. iterate through the hotstore to collect cold objects
|
||||||
log.Info("collecting cold objects")
|
log.Info("collecting cold objects")
|
||||||
|
Loading…
Reference in New Issue
Block a user