smarter trackTxnRefMany
This commit is contained in:
parent
f124389b66
commit
13d612f72f
@ -605,16 +605,58 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, deep bool) error {
|
|||||||
|
|
||||||
// we have finished marking, protect the reference
|
// we have finished marking, protect the reference
|
||||||
if !deep {
|
if !deep {
|
||||||
return s.doTxnProtect(c)
|
return s.doTxnProtect(c, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.doTxnProtectDeep(c)
|
return s.doTxnProtectDeep(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) doTxnProtect(c cid.Cid) error {
|
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
||||||
|
if !s.txnActive {
|
||||||
|
// not compacting
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.txnRefs != nil {
|
||||||
|
// we haven't finished marking yet, so track the reference
|
||||||
|
s.txnRefsMx.Lock()
|
||||||
|
for _, c := range cids {
|
||||||
|
s.txnRefs[c] = struct{}{}
|
||||||
|
}
|
||||||
|
s.txnRefsMx.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have finished marking, protect the refs
|
||||||
|
batch := make(map[cid.Cid]struct{}, len(cids))
|
||||||
|
for _, c := range cids {
|
||||||
|
batch[c] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cids {
|
||||||
|
err := s.doTxnProtect(c, batch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
|
||||||
// it's a shallow reference, protect with a standard walk without occur checking
|
// it's a shallow reference, protect with a standard walk without occur checking
|
||||||
return s.walkObject(c, cid.NewSet(),
|
return s.walkObject(root, cid.NewSet(),
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
|
if c != root {
|
||||||
|
_, ok := batch[c]
|
||||||
|
if ok {
|
||||||
|
// it's on the same batch, stop walk
|
||||||
|
// this check is necessary as the object may contain references to objects
|
||||||
|
// in the same batch (yet to be written) that cannot be loaded for the walk
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mark, err := s.txnMarkSet.Has(c)
|
mark, err := s.txnMarkSet.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||||
@ -649,12 +691,12 @@ func (s *SplitStore) doTxnProtect(c cid.Cid) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) doTxnProtectDeep(c cid.Cid) error {
|
func (s *SplitStore) doTxnProtectDeep(root cid.Cid) error {
|
||||||
// it's a deep reference potentially in vm context
|
// it's a deep reference potentially in vm context
|
||||||
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
// we do a deep walk to visit the children first, short-circuiting if the parent has been marked.
|
||||||
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
// the deep walk is necessary as internal references may be missing, e.g. because a defunct object
|
||||||
// got recreated by the VM.
|
// got recreated by the VM.
|
||||||
return s.walkObjectDeep(c, cid.NewSet(),
|
return s.walkObjectDeep(root, cid.NewSet(),
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
mark, err := s.txnMarkSet.Has(c)
|
mark, err := s.txnMarkSet.Has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -701,33 +743,6 @@ func (s *SplitStore) doTxnProtectDeep(c cid.Cid) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
|
|
||||||
if !s.txnActive {
|
|
||||||
// not compacting
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.txnRefs != nil {
|
|
||||||
// we haven't finished marking yet, so track the reference
|
|
||||||
s.txnRefsMx.Lock()
|
|
||||||
for _, c := range cids {
|
|
||||||
s.txnRefs[c] = struct{}{}
|
|
||||||
}
|
|
||||||
s.txnRefsMx.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have finished marking, shallow protect the refs
|
|
||||||
for _, c := range cids {
|
|
||||||
err := s.txnProtect.Mark(c)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||||
return xerrors.Errorf("error locking compaction")
|
return xerrors.Errorf("error locking compaction")
|
||||||
|
Loading…
Reference in New Issue
Block a user