diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 4c537788d..eabf6a3ca 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -79,6 +79,8 @@ var ( enableDebugLog = false // set this to true if you want to track origin stack traces in the write log enableDebugLogWriteTraces = false + + maxMissingScanRetries = 3 ) const ( @@ -633,7 +635,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error { return xerrors.Errorf("error checking hotstore for %s: %w", c, err) } - // it has been deleted, signal to the vm to copy + // it's not there (might have been deleted), signal to the vm to copy if !has { log.Warnf("missing object for recursive reference to %s", c) return errMissingObject @@ -846,6 +848,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { startMark = time.Now() walked := cid.NewSet() count = 0 + var missing []cid.Cid for c := range txnRefs { mark, err := markSet.Has(c) if err != nil { @@ -871,10 +874,71 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { }) if err != nil { - return xerrors.Errorf("error walking %s for marking: %w", c, err) + if xerrors.Is(err, bstore.ErrNotFound) { + log.Warnf("missing or incomplete object: %s", c) + missing = append(missing, c) + } else { + return xerrors.Errorf("error walking %s for marking: %w", c, err) + } } } - log.Infow("update marking set done", "took", time.Since(startMark), "marked", count) + log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing)) + + // 1.2 rescan for missing objects (after waiting a minute), as they might have not been copied yet + // by the vm + if len(missing) > 0 { + try := 0 + + log.Info("rescanning for missing objects") + startMark = time.Now() + + for len(missing) > 0 { + if try > maxMissingScanRetries { + return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects", len(missing)) + } + + // discard previous walk short-cuts + walked = cid.NewSet() + towalk := missing + missing = nil + try++ + + log.Infof("rescanning for %d missing objects (attempt %d)", len(towalk), try) + // wait a minute first for in-flight writes to complete + time.Sleep(time.Minute) + + for _, c := range towalk { + // we can't reliably check the markset and short-circuit this time, we have to do full walks + // because the object was previously visited top-to-bottom, with root DAGs short circuiting + // their children. + // but we *can* short-circuit on the txn protection filter, as this implies that the object + // will be protected from purge. + err = s.walkObject(c, walked, func(c cid.Cid) error { + mark, err := s.txnProtect.Has(c) + if err != nil { + return xerrors.Errorf("error checking protected set for %s: %w", c, err) + } + + if mark { + return errStopWalk + } + + return markSet.Mark(c) + }) + + if err != nil { + if xerrors.Is(err, bstore.ErrNotFound) { + log.Warnf("missing or incomplete object: %s", c) + missing = append(missing, c) + } else { + return xerrors.Errorf("error walking %s for marking: %w", c, err) + } + } + } + } + + log.Infow("rescanning done", "took", time.Since(startMark)) + } // 2. iterate through the hotstore to collect cold objects log.Info("collecting cold objects")