From 680af8eb09276c8b27096c777a6fbce4c934acdd Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 4 Jul 2021 18:37:53 +0300 Subject: [PATCH] use deep object walking for more robust handling of transactional references --- blockstore/splitstore/splitstore.go | 309 +++++++++++++++++----------- 1 file changed, 193 insertions(+), 116 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 5ccdf80a5..58d9fa282 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -141,13 +141,14 @@ type SplitStore struct { debug *debugLog // protection for concurrent read/writes during compaction - txnLk sync.RWMutex - txnActive bool - txnEnv MarkSetEnv - txnProtect MarkSet - txnMarkSet MarkSet - txnRefsMx sync.Mutex - txnRefs map[cid.Cid]struct{} + txnLk sync.RWMutex + txnActive bool + txnBoundaryEpoch abi.ChainEpoch + txnEnv MarkSetEnv + txnProtect MarkSet + txnMarkSet MarkSet + txnRefsMx sync.Mutex + txnRefs map[cid.Cid]struct{} } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -600,45 +601,59 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error { // we have finished marking, protect the reference if !recursive { + // shallow protect return s.txnProtect.Mark(c) } // it's a recursive reference in vm context, protect links if they are not in the markset already - return s.walkObject(c, cid.NewSet(), func(c cid.Cid) error { - mark, err := s.txnMarkSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) - } + // we do a deep walk to visit the children first, short-circuiting if the parent has been marked. + // the deep walk is necessary as internal references may be missing, e.g. because a defunct object + // got recreated by the VM. + return s.walkObjectDeep(c, cid.NewSet(), + func(c cid.Cid) error { + mark, err := s.txnMarkSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking mark set for %s: %w", c, err) + } - // it's marked, nothing to do - if mark { - return errStopWalk - } + // it's marked, nothing to do + if mark { + return errStopWalk + } - live, err := s.txnProtect.Has(c) - if err != nil { - return xerrors.Errorf("error checking portected set for %s: %w", c, err) - } + // old block reference -- see comment in doCompact about the necessity of this + isOldBlock, err := s.isOldBlockHeader(c, s.txnBoundaryEpoch) + if err != nil { + return xerrors.Errorf("error checking object type for %s: %w", c, err) + } - if live { - return errStopWalk - } + if isOldBlock { + return errStopWalk + } - // this occurs check is necessary because cold objects are purged in arbitrary order - has, err := s.hot.Has(c) - if err != nil { - return xerrors.Errorf("error checking hotstore for %s: %w", c, err) - } + return nil + }, + func(c cid.Cid) error { + // this occurs check is necessary because cold objects are purged in arbitrary order + has, err := s.hot.Has(c) + if err != nil { + return xerrors.Errorf("error checking hotstore for %s: %w", c, err) + } - // it's not there (might have been deleted), signal to the vm to copy - if !has { - log.Warnf("missing object for recursive reference to %s", c) - return errMissingObject - } + // it's not there (might have been deleted), signal to the vm to copy + if !has { + log.Warnf("missing object for recursive reference to %s", c) + return errMissingObject + } - // mark it - return s.txnProtect.Mark(c) - }) + // mark it in *both* sets, so that we can short-circuit a concurrent walk. + err = s.txnMarkSet.Mark(c) + if err != nil { + return xerrors.Errorf("error marking %s: %w", c, err) + } + + return s.txnProtect.Mark(c) + }) } func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { @@ -657,7 +672,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { return nil } - // we have finished marking, protect the refs + // we have finished marking, shallow protect the refs for _, c := range cids { err := s.txnProtect.Mark(c) if err != nil { @@ -794,6 +809,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { s.txnLk.Lock() s.txnRefs = make(map[cid.Cid]struct{}) s.txnActive = true + s.txnBoundaryEpoch = boundaryEpoch s.txnLk.Unlock() // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch @@ -839,61 +855,89 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { }() // 1.1 Update markset for references created during marking - log.Info("updating mark set for live references") - startMark = time.Now() - walked := cid.NewSet() - count = 0 var missing []cid.Cid - for c := range txnRefs { - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking markset for %s: %w", c, err) - } + if len(txnRefs) > 0 { + log.Info("updating mark set for live references", "refs", len(txnRefs)) + startMark = time.Now() + walked := cid.NewSet() + count = 0 - if mark { - continue - } - - err = s.walkObject(c, walked, func(c cid.Cid) error { + for c := range txnRefs { mark, err := markSet.Has(c) if err != nil { return xerrors.Errorf("error checking markset for %s: %w", c, err) } if mark { - return errStopWalk + continue } - count++ - return markSet.Mark(c) - }) + // we have to do a deep walk here, as an early mark would stick even if there are + // missing references that haven't been written yet! + err = s.walkObjectDeep(c, walked, + func(c cid.Cid) error { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset for %s: %w", c, err) + } - if err != nil { - if xerrors.Is(err, bstore.ErrNotFound) { - log.Warnf("missing or incomplete object: %s", c) - missing = append(missing, c) - } else { - return xerrors.Errorf("error walking %s for marking: %w", c, err) + if mark { + return errStopWalk + } + + // we also short-circuit on old blocks, as these can come from a network request + // and cause us to fail because we have purged its consistituents (or missing from + // the beginning in case of snapshot sync, e.g. parent message receipts or old messages) + // if these blocks are on our chain, they would have been marked but they might be + // from a fork. + // + // Ideally, we would have API options to preclude us from trcking references to such + // objects, but we don't so we have to do this check + isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch) + if err != nil { + return xerrors.Errorf("error checking object type for %s: %w", c, err) + } + + if isOldBlock { + return errStopWalk + } + + return nil + }, + func(c cid.Cid) error { + count++ + return markSet.Mark(c) + }) + + if err != nil { + if xerrors.Is(err, bstore.ErrNotFound) { + log.Warnf("missing or incomplete object: %s", c) + missing = append(missing, c) + } else { + return xerrors.Errorf("error walking %s for marking: %w", c, err) + } } } - } - log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing)) - // 1.2 rescan for missing objects (after waiting a minute), as they might have not been copied yet - // by the vm + log.Infow("update marking set done", "took", time.Since(startMark), "marked", count, "missing", len(missing)) + } + + // 1.2 rescan for missing objects (after waiting a bit), as they might have not been copied yet + // by the vm at the time of the update walk. if len(missing) > 0 { try := 0 log.Info("rescanning for missing objects") startMark = time.Now() + count = 0 for len(missing) > 0 { if try > maxMissingScanRetries { - return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects", len(missing)) + return xerrors.Errorf("failed to fully scan transactional refs; %d missing objects after %d attempts", len(missing), try) } // discard previous walk short-cuts - walked = cid.NewSet() + walked := cid.NewSet() towalk := missing missing = nil try++ @@ -903,44 +947,34 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { time.Sleep(time.Minute) for _, c := range towalk { - // we can't reliably check the markset and short-circuit this time, we have to do full walks - // because the object was previously visited top-to-bottom, with root DAGs short circuiting - // their children. - // but we *can* short-circuit on the txn protection filter, as this implies that the object - // will be protected from purge. - err = s.walkObject(c, walked, func(c cid.Cid) error { - mark, err := s.txnProtect.Has(c) - if err != nil { - return xerrors.Errorf("error checking protected set for %s: %w", c, err) - } + // deep walk here again, as we are concerned about internal references not having been written + err = s.walkObjectDeep(c, walked, + func(c cid.Cid) error { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking mark set for %s: %w", c, err) + } - if mark { - return errStopWalk - } + if mark { + return errStopWalk + } - // mark it - err = markSet.Mark(c) - if err != nil { - return err - } + // see comment above for this check + isOldBlock, err := s.isOldBlockHeader(c, boundaryEpoch) + if err != nil { + return xerrors.Errorf("error checking object type for %s: %w", c, err) + } - // we also short-circuit in case of a block header, as it may cause us to walk the - // entire chain because of a network request (and fail if we were synced form a snapshot - // because of missing messages or receipts!) - // this is necessary because we don't have interface options to signal network request - // initiated API calls; when we have that, we can stop tracking those references and - // we can remove this check. - isBlock, err := s.isBlockHeader(c) - if err != nil { - return xerrors.Errorf("error checking object type for %s: %W", c, err) - } + if isOldBlock { + return errStopWalk + } - if isBlock { - return errStopWalk - } - - return nil - }) + return nil + }, + func(c cid.Cid) error { + count++ + return markSet.Mark(c) + }) if err != nil { if xerrors.Is(err, bstore.ErrNotFound) { @@ -953,7 +987,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } } - log.Infow("rescanning done", "took", time.Since(startMark)) + log.Infow("rescanning done", "took", time.Since(startMark), "marked", count) } // 2. iterate through the hotstore to collect cold objects @@ -1112,20 +1146,6 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs return nil } -func (s *SplitStore) isBlockHeader(c cid.Cid) (isBlock bool, err error) { - if c.Prefix().Codec != cid.DagCBOR { - return false, nil - } - - err = s.view(c, func(data []byte) error { - var hdr types.BlockHeader - isBlock = hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil - return nil - }) - - return isBlock, err -} - func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error { if !walked.Visit(c) { return nil @@ -1164,6 +1184,47 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro return nil } +// like walkObject, but it visits leaves first, with pre invoked at the parent node to control +// whether the walk should stop +func (s *SplitStore) walkObjectDeep(c cid.Cid, walked *cid.Set, + pre func(cid.Cid) error, f func(cid.Cid) error) error { + if !walked.Visit(c) { + return nil + } + + if err := pre(c); err != nil { + if err == errStopWalk { + return nil + } + + return err + } + + if c.Prefix().Codec != cid.DagCBOR { + return f(c) + } + + var links []cid.Cid + err := s.view(c, func(data []byte) error { + return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { + links = append(links, c) + }) + }) + + if err != nil { + return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) + } + + for _, c := range links { + err := s.walkObjectDeep(c, walked, pre, f) + if err != nil { + return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + } + } + + return f(c) +} + // internal version used by walk func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error { err := s.hot.View(cid, cb) @@ -1176,6 +1237,22 @@ func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error { } } +func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlock bool, err error) { + if c.Prefix().Codec != cid.DagCBOR { + return false, nil + } + + err = s.view(c, func(data []byte) error { + var hdr types.BlockHeader + if hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil { + isOldBlock = hdr.Height < epoch + } + return nil + }) + + return isOldBlock, err +} + func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { batch := make([]blocks.Block, 0, batchSize)