parallelize walkChain

This commit is contained in:
vyzo 2022-01-25 17:29:02 +02:00
parent 25284b5325
commit 7c8edf5632
2 changed files with 69 additions and 19 deletions

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"runtime" "runtime"
"sort" "sort"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -306,7 +307,7 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
// Note: cold objects are deleted heaviest first, so the consituents of an object // Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself. // cannot be deleted before the object itself.
return s.walkObjectIncomplete(root, tmpVisitor(), return s.walkObjectIncomplete(root, newTmpVisitor(),
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk
@ -621,26 +622,31 @@ func (s *SplitStore) endTxnProtect() {
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
visitor ObjectVisitor, f func(cid.Cid) error) error { visitor ObjectVisitor, f func(cid.Cid) error) error {
var walked *cid.Set var walked ObjectVisitor
var mx sync.Mutex
toWalk := ts.Cids() toWalk := ts.Cids()
walkCnt := 0 walkCnt := new(int64)
scanCnt := 0 scanCnt := new(int64)
stopWalk := func(_ cid.Cid) error { return errStopWalk } stopWalk := func(_ cid.Cid) error { return errStopWalk }
walkBlock := func(c cid.Cid) error { walkBlock := func(c cid.Cid) error {
if !walked.Visit(c) { visit, err := walked.Visit(c)
if err != nil {
return err
}
if !visit {
return nil return nil
} }
walkCnt++ atomic.AddInt64(walkCnt, 1)
if err := f(c); err != nil { if err := f(c); err != nil {
return err return err
} }
var hdr types.BlockHeader var hdr types.BlockHeader
err := s.view(c, func(data []byte) error { err = s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data)) return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
}) })
@ -676,16 +682,23 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil { if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
} }
scanCnt++ atomic.AddInt64(scanCnt, 1)
} }
if hdr.Height > 0 { if hdr.Height > 0 {
mx.Lock()
toWalk = append(toWalk, hdr.Parents...) toWalk = append(toWalk, hdr.Parents...)
mx.Unlock()
} }
return nil return nil
} }
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
for len(toWalk) > 0 { for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity // walking can take a while, so check this with every opportunity
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
@ -695,17 +708,34 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
// the walk is BFS, so we can reset the walked set in every iteration and avoid building up // the walk is BFS, so we can reset the walked set in every iteration and avoid building up
// a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing // a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing
// over time) // over time)
walked = cid.NewSet() walked = newConcurrentVisitor()
walking := toWalk walking := toWalk
toWalk = nil toWalk = nil
workch := make(chan cid.Cid, len(walking))
for _, c := range walking { for _, c := range walking {
if err := walkBlock(c); err != nil { workch <- c
return xerrors.Errorf("error walking block (cid: %s): %w", c, err) }
} close(workch)
g := new(errgroup.Group)
for i := 0; i < workers; i++ {
g.Go(func() error {
for c := range workch {
if err := walkBlock(c); err != nil {
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
} }
} }
log.Infow("chain walk done", "walked", walkCnt, "scanned", scanCnt) log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt)
return nil return nil
} }
@ -1106,7 +1136,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
} }
towalk := missing towalk := missing
visitor := tmpVisitor() visitor := newTmpVisitor()
missing = make(map[cid.Cid]struct{}) missing = make(map[cid.Cid]struct{})
for c := range towalk { for c := range towalk {

View File

@ -1,6 +1,8 @@
package splitstore package splitstore
import ( import (
"sync"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
@ -17,16 +19,34 @@ func (v *noopVisitor) Visit(_ cid.Cid) (bool, error) {
return true, nil return true, nil
} }
type cidSetVisitor struct { type tmpVisitor struct {
set *cid.Set set *cid.Set
} }
var _ ObjectVisitor = (*cidSetVisitor)(nil) var _ ObjectVisitor = (*tmpVisitor)(nil)
func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) { func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) {
return v.set.Visit(c), nil return v.set.Visit(c), nil
} }
func tmpVisitor() ObjectVisitor { func newTmpVisitor() ObjectVisitor {
return &cidSetVisitor{set: cid.NewSet()} return &tmpVisitor{set: cid.NewSet()}
}
type concurrentVisitor struct {
mx sync.Mutex
set *cid.Set
}
var _ ObjectVisitor = (*concurrentVisitor)(nil)
func newConcurrentVisitor() *concurrentVisitor {
return &concurrentVisitor{set: cid.NewSet()}
}
func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) {
v.mx.Lock()
defer v.mx.Unlock()
return v.set.Visit(c), nil
} }