mark tipset references to protect them during critical section
This commit is contained in:
parent
ee63be26a1
commit
c9bd5ec452
@ -140,9 +140,9 @@ func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool {
|
|||||||
// transactionally protect incoming tipsets
|
// transactionally protect incoming tipsets
|
||||||
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
||||||
s.txnLk.RLock()
|
s.txnLk.RLock()
|
||||||
defer s.txnLk.RUnlock()
|
|
||||||
|
|
||||||
if !s.txnActive {
|
if !s.txnActive {
|
||||||
|
s.txnLk.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,7 +151,81 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
|||||||
cids = append(cids, ts.Cids()...)
|
cids = append(cids, ts.Cids()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(cids) == 0 {
|
||||||
|
s.txnLk.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// critical section
|
||||||
|
if s.txnMarkSet != nil {
|
||||||
|
go func() {
|
||||||
|
defer s.txnLk.RUnlock()
|
||||||
|
s.markTipSetRefs(cids)
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.trackTxnRefMany(cids)
|
s.trackTxnRefMany(cids)
|
||||||
|
s.txnLk.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) markTipSetRefs(cids []cid.Cid) {
|
||||||
|
log.Info("marking %d tipset refs", len(cids))
|
||||||
|
startMark := time.Now()
|
||||||
|
|
||||||
|
workch := make(chan cid.Cid, len(cids))
|
||||||
|
for _, c := range cids {
|
||||||
|
workch <- c
|
||||||
|
}
|
||||||
|
close(workch)
|
||||||
|
|
||||||
|
count := new(int32)
|
||||||
|
worker := func() error {
|
||||||
|
for c := range workch {
|
||||||
|
err := s.walkObject(c, newTmpVisitor(),
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
if isUnitaryObject(c) {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
visit, err := s.txnMarkSet.Visit(c)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error visiting object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !visit {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt32(count, 1)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
workers := runtime.NumCPU() / 2
|
||||||
|
if workers < 2 {
|
||||||
|
workers = 2
|
||||||
|
}
|
||||||
|
if workers > len(cids) {
|
||||||
|
workers = len(cids)
|
||||||
|
}
|
||||||
|
|
||||||
|
g := new(errgroup.Group)
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
g.Go(worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.Wait(); err != nil {
|
||||||
|
log.Errorf("error marking tipset refs: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("marking tipset refs done", "took", time.Since(startMark), "marked", *count)
|
||||||
}
|
}
|
||||||
|
|
||||||
// transactionally protect a view
|
// transactionally protect a view
|
||||||
|
Loading…
Reference in New Issue
Block a user