short-circuit marking walks when encountering a block and more efficient walking
This commit is contained in:
parent
736d6a3c19
commit
8157f889ce
@ -77,6 +77,9 @@ var (
|
||||
|
||||
log = logging.Logger("splitstore")
|
||||
|
||||
// used to signal end of walk
|
||||
errStopWalk = errors.New("stop walk")
|
||||
|
||||
// set this to true if you are debugging the splitstore to enable debug logging
|
||||
enableDebugLog = false
|
||||
// set this to true if you want to track origin stack traces in the write log
|
||||
@ -636,6 +639,16 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
|
||||
seen[c] = struct{}{}
|
||||
}
|
||||
|
||||
// if it is a block reference, short-circuit or else we'll end up walking the entire chain
|
||||
isBlock, err := s.isBlockHeader(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error determining if cid %s is a block header: %w", c, err)
|
||||
}
|
||||
|
||||
if isBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -660,6 +673,16 @@ func (s *SplitStore) isVMCopyContext() bool {
|
||||
return strings.Contains(sk, "filecoin-project/lotus/chain/vm.Copy")
|
||||
}
|
||||
|
||||
func (s *SplitStore) isBlockHeader(c cid.Cid) (isBlock bool, err error) {
|
||||
err = s.view(c, func(data []byte) error {
|
||||
var hdr types.BlockHeader
|
||||
isBlock = hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil
|
||||
return nil
|
||||
})
|
||||
|
||||
return isBlock, err
|
||||
}
|
||||
|
||||
func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) error {
|
||||
if s.txnProtect == nil {
|
||||
// not compacting
|
||||
@ -682,17 +705,25 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) error {
|
||||
err = s.txnProtect.Mark(c)
|
||||
} else {
|
||||
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
||||
// this check is necessary to avoid races because objects are purged in random order
|
||||
has, err := s.hot.Has(c)
|
||||
// check if it is a block; implicitly checks if the object exists --if it doesn't because
|
||||
// it has been purged, it will be an error
|
||||
isBlock, err := s.isBlockHeader(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error determining if cid %s is a block header: %w", c, err)
|
||||
}
|
||||
|
||||
// mark the object
|
||||
err = s.txnProtect.Mark(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !has {
|
||||
return xerrors.Errorf("object (cid: %s) has been purged", c)
|
||||
// if it is a block reference, short-circuit or else we'll end up walking the entire chain
|
||||
if isBlock {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
return s.txnProtect.Mark(c)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -1142,13 +1173,12 @@ func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, f
|
||||
return err
|
||||
}
|
||||
|
||||
blk, err := s.get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error retrieving block (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
var hdr types.BlockHeader
|
||||
if err := hdr.UnmarshalCBOR(bytes.NewBuffer(blk.RawData())); err != nil {
|
||||
err := s.view(c, func(data []byte) error {
|
||||
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
@ -1203,6 +1233,10 @@ func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error
|
||||
}
|
||||
|
||||
if err := f(c); err != nil {
|
||||
if err == errStopWalk {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1210,40 +1244,36 @@ func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error
|
||||
return nil
|
||||
}
|
||||
|
||||
blk, err := s.get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error retrieving linked block (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
var rerr error
|
||||
err = cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
|
||||
if rerr != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := s.walkLinks(c, walked, f)
|
||||
if err != nil {
|
||||
rerr = err
|
||||
}
|
||||
var links []cid.Cid
|
||||
err := s.view(c, func(data []byte) error {
|
||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||
links = append(links, c)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error scanning links (cid: %s): %w", c, err)
|
||||
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
return rerr
|
||||
for _, c := range links {
|
||||
err := s.walkLinks(c, walked, f)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// internal version used by walk so that we don't blow the txn
|
||||
func (s *SplitStore) get(cid cid.Cid) (blocks.Block, error) {
|
||||
blk, err := s.hot.Get(cid)
|
||||
|
||||
// internal version used by walk
|
||||
func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error {
|
||||
err := s.hot.View(cid, cb)
|
||||
switch err {
|
||||
case bstore.ErrNotFound:
|
||||
return s.cold.Get(cid)
|
||||
return s.cold.View(cid, cb)
|
||||
|
||||
default:
|
||||
return blk, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user