use visitors instead of cidsets in walks
This commit is contained in:
parent
1323d8fb20
commit
49346f5679
@ -83,7 +83,14 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
|
||||
write("--")
|
||||
|
||||
var coldCnt, missingCnt int64
|
||||
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch,
|
||||
|
||||
visitor, err := s.markSetEnv.CreateVisitor("check", 0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating visitor: %w", err)
|
||||
}
|
||||
defer visitor.Close() //nolint
|
||||
|
||||
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
|
@ -290,7 +290,7 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
|
||||
|
||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||
// cannot be deleted before the object itself.
|
||||
return s.walkObjectIncomplete(root, cid.NewSet(),
|
||||
return s.walkObjectIncomplete(root, tmpVisitor(),
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
@ -410,12 +410,21 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
startMark := time.Now()
|
||||
|
||||
var count int64
|
||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
|
||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
has, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset: %w", err)
|
||||
}
|
||||
|
||||
if has {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
count++
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
@ -606,8 +615,7 @@ func (s *SplitStore) endTxnProtect() {
|
||||
}
|
||||
|
||||
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
|
||||
f func(cid.Cid) error) error {
|
||||
visited := cid.NewSet()
|
||||
visitor ObjectVisitor, f func(cid.Cid) error) error {
|
||||
walked := cid.NewSet()
|
||||
toWalk := ts.Cids()
|
||||
walkCnt := 0
|
||||
@ -616,7 +624,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
stopWalk := func(_ cid.Cid) error { return errStopWalk }
|
||||
|
||||
walkBlock := func(c cid.Cid) error {
|
||||
if !visited.Visit(c) {
|
||||
if !walked.Visit(c) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -640,19 +648,19 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
if inclMsgs < inclState {
|
||||
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
|
||||
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
||||
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
|
||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||
}
|
||||
|
||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, walked, f, stopWalk); err != nil {
|
||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||
}
|
||||
} else {
|
||||
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
|
||||
if err := s.walkObject(hdr.Messages, visitor, f); err != nil {
|
||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||
}
|
||||
|
||||
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
|
||||
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil {
|
||||
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||
}
|
||||
}
|
||||
@ -660,7 +668,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
|
||||
// state is only retained if within the inclState boundary, with the exception of genesis
|
||||
if hdr.Height >= inclState || hdr.Height == 0 {
|
||||
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
|
||||
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
|
||||
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
||||
}
|
||||
scanCnt++
|
||||
@ -693,8 +701,13 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
|
||||
if !walked.Visit(c) {
|
||||
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error {
|
||||
visit, err := visitor.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -716,7 +729,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
||||
}
|
||||
|
||||
var links []cid.Cid
|
||||
err := s.view(c, func(data []byte) error {
|
||||
err = s.view(c, func(data []byte) error {
|
||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||
links = append(links, c)
|
||||
})
|
||||
@ -727,7 +740,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
||||
}
|
||||
|
||||
for _, c := range links {
|
||||
err := s.walkObject(c, walked, f)
|
||||
err := s.walkObject(c, visitor, f)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||
}
|
||||
@ -737,8 +750,13 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
|
||||
}
|
||||
|
||||
// like walkObject, but the object may be potentially incomplete (references missing)
|
||||
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
|
||||
if !walked.Visit(c) {
|
||||
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error {
|
||||
visit, err := visitor.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -777,7 +795,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
|
||||
}
|
||||
|
||||
var links []cid.Cid
|
||||
err := s.view(c, func(data []byte) error {
|
||||
err = s.view(c, func(data []byte) error {
|
||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||
links = append(links, c)
|
||||
})
|
||||
@ -788,7 +806,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
|
||||
}
|
||||
|
||||
for _, c := range links {
|
||||
err := s.walkObjectIncomplete(c, walked, f, missing)
|
||||
err := s.walkObjectIncomplete(c, visitor, f, missing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||
}
|
||||
@ -1079,11 +1097,11 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
|
||||
}
|
||||
|
||||
towalk := missing
|
||||
walked := cid.NewSet()
|
||||
visitor := tmpVisitor()
|
||||
missing = make(map[cid.Cid]struct{})
|
||||
|
||||
for c := range towalk {
|
||||
err := s.walkObjectIncomplete(c, walked,
|
||||
err := s.walkObjectIncomplete(c, visitor,
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
|
@ -59,7 +59,15 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
||||
count := int64(0)
|
||||
xcount := int64(0)
|
||||
missing := int64(0)
|
||||
err := s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
|
||||
|
||||
visitor, err := s.markSetEnv.CreateVisitor("warmup", 0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating visitor: %w", err)
|
||||
}
|
||||
defer visitor.Close() //nolint
|
||||
|
||||
err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
|
||||
visitor,
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
|
@ -26,3 +26,7 @@ var _ ObjectVisitor = (*cidSetVisitor)(nil)
|
||||
func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) {
|
||||
return v.set.Visit(c), nil
|
||||
}
|
||||
|
||||
func tmpVisitor() ObjectVisitor {
|
||||
return &cidSetVisitor{set: cid.NewSet()}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user