diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 63490d1d6..58e76329d 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -77,8 +77,6 @@ var ( // used to signal end of walk errStopWalk = errors.New("stop walk") - // used to signal a missing object when protecting recursive references - errMissingObject = errors.New("missing object") // set this to true if you are debugging the splitstore to enable debug logging enableDebugLog = false @@ -226,17 +224,7 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) { } if has { - // treat it as an implicit (recursive) Write, when it is within vm.Copy context. - // -- the vm uses this check to avoid duplicate writes on Copy. - // When we have options in the API (or something better), the vm can explicitly signal - // that this is an implicit Write. - err = s.trackTxnRef(c, true) - if xerrors.Is(err, errMissingObject) { - // we failed to recursively protect the object because some inner object has been purged; - // signal to the VM to copy. - return false, nil - } - + err = s.trackTxnRef(c) return true, err } @@ -251,7 +239,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { switch err { case nil: - err = s.trackTxnRef(cid, false) + err = s.trackTxnRef(cid) return blk, err case bstore.ErrNotFound: @@ -285,7 +273,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { switch err { case nil: - err = s.trackTxnRef(cid, false) + err = s.trackTxnRef(cid) return size, err case bstore.ErrNotFound: @@ -323,7 +311,7 @@ func (s *SplitStore) Put(blk blocks.Block) error { s.mx.Unlock() s.debug.LogWrite(curTs, blk, writeEpoch) } - err = s.trackTxnRef(blk.Cid(), false) + err = s.trackTxnRef(blk.Cid()) } return err @@ -400,7 +388,7 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { err := s.hot.View(cid, cb) switch err { case nil: - err = s.trackTxnRef(cid, false) + err = s.trackTxnRef(cid) return err case bstore.ErrNotFound: @@ -590,7 +578,7 @@ func (s *SplitStore) updateWriteEpoch() { } } -func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error { +func (s *SplitStore) trackTxnRef(c cid.Cid) error { if !s.txnActive { // not compacting return nil @@ -605,11 +593,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error { } // we have finished marking, protect the reference - if !deep { - return s.doTxnProtect(c, nil) - } - - return s.doTxnProtectDeep(c) + return s.doTxnProtect(c, nil) } func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { @@ -645,8 +629,10 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { } func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error { - // it's a shallow reference, protect with a standard walk without occur checking - return s.walkObject(root, cid.NewSet(), + // Note: cold objects are deleted heaviest first, so the consituents of an object + // cannot be deleted before the object itself. + // so we just do a regular walk and mark in the protected set. + err := s.walkObject(root, cid.NewSet(), func(c cid.Cid) error { if c != root { _, ok := batch[c] @@ -690,58 +676,12 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro return s.txnProtect.Mark(c) }) -} -func (s *SplitStore) doTxnProtectDeep(root cid.Cid) error { - // it's a deep reference potentially in vm context - // we do a deep walk to visit the children first, short-circuiting if the parent has been marked. - // the deep walk is necessary as internal references may be missing, e.g. because a defunct object - // got recreated by the VM. - return s.walkObjectDeep(root, cid.NewSet(), - func(c cid.Cid) error { - mark, err := s.txnMarkSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) - } + if err != nil { + log.Warnf("error protecting object (cid: %s): %s", root, err) + } - // it's marked, nothing to do - if mark { - return errStopWalk - } - - // old block reference -- see comment in doCompact about the necessity of this - isOldBlock, err := s.isOldBlockHeader(c, s.txnLookbackEpoch) - if err != nil { - return xerrors.Errorf("error checking object type for %s: %w", c, err) - } - - if isOldBlock { - return errStopWalk - } - - return nil - }, - func(c cid.Cid) error { - // this occurs check is necessary because cold objects are purged in arbitrary order - has, err := s.hot.Has(c) - if err != nil { - return xerrors.Errorf("error checking hotstore for %s: %w", c, err) - } - - // it's not there (might have been deleted), signal to the vm to copy - if !has { - log.Warnf("missing object for recursive reference to %s", c) - return errMissingObject - } - - // mark it in *both* sets, so that we can short-circuit a concurrent walk. - err = s.txnMarkSet.Mark(c) - if err != nil { - return xerrors.Errorf("error marking %s: %w", c, err) - } - - return s.txnProtect.Mark(c) - }) + return err } func (s *SplitStore) warmup(curTs *types.TipSet) error { @@ -917,7 +857,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { }() // 1.1 Update markset for references created during marking - var missing []cid.Cid if len(txnRefs) > 0 { log.Info("updating mark set for live references", "refs", len(txnRefs)) startMark = time.Now() @@ -934,9 +873,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { continue } - // we have to do a deep walk here, as an early mark would stick even if there are - // missing references that haven't been written yet! - err = s.walkObjectDeep(c, walked, + err = s.walkObject(c, walked, func(c cid.Cid) error { mark, err := markSet.Has(c) if err != nil { @@ -964,92 +901,16 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return errStopWalk } - return nil - }, - func(c cid.Cid) error { count++ return markSet.Mark(c) }) if err != nil { - if xerrors.Is(err, bstore.ErrNotFound) { - log.Warnf("missing or incomplete object: %s", c) - missing = append(missing, c) - } else { - return xerrors.Errorf("error walking %s for marking: %w", c, err) - } + return xerrors.Errorf("error walking %s for marking: %w", c, err) } } - log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing)) - } - - // 1.2 rescan for missing objects (after waiting a bit), as they might have not been copied yet - // by the vm at the time of the update walk. - if len(missing) > 0 { - try := 0 - - log.Info("rescanning for missing objects") - startMark = time.Now() - count = 0 - - for len(missing) > 0 { - if try > maxMissingScanRetries { - return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects after %d attempts", len(missing), try) - } - - // discard previous walk short-cuts - walked := cid.NewSet() - towalk := missing - missing = nil - try++ - - log.Infof("rescanning for %d missing objects (attempt %d)", len(towalk), try) - // wait a minute first for in-flight writes to complete - time.Sleep(time.Minute) - - for _, c := range towalk { - // deep walk here again, as we are concerned about internal references not having been written - err = s.walkObjectDeep(c, walked, - func(c cid.Cid) error { - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) - } - - if mark { - return errStopWalk - } - - // see comment above for this check - isOldBlock, err := s.isOldBlockHeader(c, lookbackEpoch) - if err != nil { - return xerrors.Errorf("error checking object type for %s: %w", c, err) - } - - if isOldBlock { - return errStopWalk - } - - return nil - }, - func(c cid.Cid) error { - count++ - return markSet.Mark(c) - }) - - if err != nil { - if xerrors.Is(err, bstore.ErrNotFound) { - log.Warnf("missing or incomplete object: %s", c) - missing = append(missing, c) - } else { - return xerrors.Errorf("error walking %s for marking: %w", c, err) - } - } - } - } - - log.Infow("rescanning done", "took", time.Since(startMark), "marked", count) + log.Infow("update marking set done", "took", time.Since(startMark), "marked", count) } // 2. iterate through the hotstore to collect cold objects