Merge pull request #7999 from filecoin-project/feat/splistore-refactors

some basic splitstore refactors
This commit is contained in:
vyzo 2022-01-28 13:55:16 +02:00 committed by GitHub
commit ff10e0eaf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 123 additions and 111 deletions

View File

@ -10,20 +10,12 @@ import (
var errMarkSetClosed = errors.New("markset closed")
// MarkSet is a utility to keep track of seen CID, and later query for them.
//
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
// * If a probabilistic result is acceptable, it can be backed by a bloom filter
// MarkSet is an interface for tracking CIDs during chain and object walks
type MarkSet interface {
ObjectVisitor
Mark(cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
SetConcurrent()
}
type MarkSetVisitor interface {
MarkSet
ObjectVisitor
}
type MarkSetEnv interface {
@ -31,11 +23,7 @@ type MarkSetEnv interface {
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
// CreateVisitor is like Create, but returns a wider interface that supports atomic visits.
// It may not be supported by some markset types (e.g. bloom).
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
// SupportsVisitor returns true if the marksets created by this environment support the visitor interface.
SupportsVisitor() bool
// Close closes the markset
Close() error
}

View File

@ -34,7 +34,6 @@ type BadgerMarkSet struct {
}
var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)
var badgerMarkSetBatchSize = 16384
@ -48,7 +47,7 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}
func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name)
@ -68,16 +67,6 @@ func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet,
return ms, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true }
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}

View File

@ -13,42 +13,27 @@ var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct {
mx sync.RWMutex
set map[string]struct{}
ts bool
}
var _ MarkSet = (*MapMarkSet)(nil)
var _ MarkSetVisitor = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
}
func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) {
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) SupportsVisitor() bool { return true }
func (e *MapMarkSetEnv) Close() error {
return nil
}
func (s *MapMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
@ -59,10 +44,8 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error {
}
func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
s.mx.RLock()
defer s.mx.RUnlock()
if s.set == nil {
return false, errMarkSetClosed
@ -73,10 +56,8 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
}
func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return false, errMarkSetClosed
@ -92,14 +73,9 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
}
func (s *MapMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.mx.Lock()
defer s.mx.Unlock()
s.set = nil
return nil
}
func (s *MapMarkSet) SetConcurrent() {
s.ts = true
}

View File

@ -167,7 +167,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck
visitor, err := env.CreateVisitor("test", 0)
visitor, err := env.Create("test", 0)
if err != nil {
t.Fatal(err)
}

View File

@ -186,10 +186,6 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
return nil, err
}
if !markSetEnv.SupportsVisitor() {
return nil, xerrors.Errorf("markset type does not support atomic visitors")
}
// and now we can make a SplitStore
ss := &SplitStore{
cfg: cfg,

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
@ -67,7 +68,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}
defer output.Close() //nolint:errcheck
var mx sync.Mutex
write := func(format string, args ...interface{}) {
mx.Lock()
defer mx.Unlock()
_, err := fmt.Fprintf(output, format+"\n", args...)
if err != nil {
log.Warnf("error writing check output: %s", err)
@ -82,9 +86,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
write("compaction index: %d", s.compactionIndex)
write("--")
var coldCnt, missingCnt int64
coldCnt := new(int64)
missingCnt := new(int64)
visitor, err := s.markSetEnv.CreateVisitor("check", 0)
visitor, err := s.markSetEnv.Create("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
@ -111,10 +116,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}
if has {
coldCnt++
atomic.AddInt64(coldCnt, 1)
write("cold object reference: %s", c)
} else {
missingCnt++
atomic.AddInt64(missingCnt, 1)
write("missing object reference: %s", c)
return errStopWalk
}
@ -128,9 +133,9 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
return err
}
log.Infow("check done", "cold", coldCnt, "missing", missingCnt)
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
write("--")
write("cold: %d missing: %d", coldCnt, missingCnt)
write("cold: %d missing: %d", *coldCnt, *missingCnt)
write("DONE")
return nil

View File

@ -5,6 +5,7 @@ import (
"errors"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
@ -227,7 +228,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
}
// protect all pending transactional references
func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error {
func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
for {
var txnRefs map[cid.Cid]struct{}
@ -299,14 +300,14 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error {
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSetVisitor) error {
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
if err := s.checkClosing(); err != nil {
return err
}
// Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself.
return s.walkObjectIncomplete(root, tmpVisitor(),
return s.walkObjectIncomplete(root, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
@ -397,7 +398,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.CreateVisitor("live", s.markSetSize)
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating mark set: %w", err)
}
@ -602,8 +603,8 @@ func (s *SplitStore) beginTxnProtect() {
s.txnMissing = make(map[cid.Cid]struct{})
}
func (s *SplitStore) beginTxnMarking(markSet MarkSetVisitor) {
markSet.SetConcurrent()
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
log.Info("beginning transactional marking")
}
func (s *SplitStore) endTxnProtect() {
@ -621,26 +622,33 @@ func (s *SplitStore) endTxnProtect() {
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
visitor ObjectVisitor, f func(cid.Cid) error) error {
var walked *cid.Set
toWalk := ts.Cids()
walkCnt := 0
scanCnt := 0
var walked ObjectVisitor
var mx sync.Mutex
// we copy the tipset first into a new slice, which allows us to reuse it in every epoch.
toWalk := make([]cid.Cid, len(ts.Cids()))
copy(toWalk, ts.Cids())
walkCnt := new(int64)
scanCnt := new(int64)
stopWalk := func(_ cid.Cid) error { return errStopWalk }
walkBlock := func(c cid.Cid) error {
if !walked.Visit(c) {
visit, err := walked.Visit(c)
if err != nil {
return err
}
if !visit {
return nil
}
walkCnt++
atomic.AddInt64(walkCnt, 1)
if err := f(c); err != nil {
return err
}
var hdr types.BlockHeader
err := s.view(c, func(data []byte) error {
err = s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
})
@ -676,11 +684,13 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
scanCnt++
atomic.AddInt64(scanCnt, 1)
}
if hdr.Height > 0 {
mx.Lock()
toWalk = append(toWalk, hdr.Parents...)
mx.Unlock()
}
return nil
@ -692,20 +702,43 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return err
}
workers := len(toWalk)
if workers > runtime.NumCPU()/2 {
workers = runtime.NumCPU() / 2
}
if workers < 2 {
workers = 2
}
// the walk is BFS, so we can reset the walked set in every iteration and avoid building up
// a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing
// over time)
walked = cid.NewSet()
walking := toWalk
toWalk = nil
for _, c := range walking {
if err := walkBlock(c); err != nil {
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
}
walked = newConcurrentVisitor()
workch := make(chan cid.Cid, len(toWalk))
for _, c := range toWalk {
workch <- c
}
close(workch)
toWalk = toWalk[:0]
g := new(errgroup.Group)
for i := 0; i < workers; i++ {
g.Go(func() error {
for c := range workch {
if err := walkBlock(c); err != nil {
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
}
log.Infow("chain walk done", "walked", walkCnt, "scanned", scanCnt)
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt)
return nil
}
@ -1011,7 +1044,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
return nil
}
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error {
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
@ -1077,7 +1110,7 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error {
// have this gem[TM].
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
// thinks the cause may be block validation.
func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) {
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
s.txnLk.Lock()
missing := s.txnMissing
s.txnMissing = nil
@ -1106,7 +1139,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) {
}
towalk := missing
visitor := tmpVisitor()
visitor := newTmpVisitor()
missing = make(map[cid.Cid]struct{})
for c := range towalk {

View File

@ -1,6 +1,7 @@
package splitstore
import (
"sync"
"sync/atomic"
"time"
@ -55,12 +56,13 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
if WarmupBoundary < epoch {
boundaryEpoch = epoch - WarmupBoundary
}
var mx sync.Mutex
batchHot := make([]blocks.Block, 0, batchSize)
count := int64(0)
xcount := int64(0)
missing := int64(0)
count := new(int64)
xcount := new(int64)
missing := new(int64)
visitor, err := s.markSetEnv.CreateVisitor("warmup", 0)
visitor, err := s.markSetEnv.Create("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
@ -73,7 +75,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
return errStopWalk
}
count++
atomic.AddInt64(count, 1)
has, err := s.hot.Has(s.ctx, c)
if err != nil {
@ -87,22 +89,25 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
if err == bstore.ErrNotFound {
missing++
atomic.AddInt64(missing, 1)
return errStopWalk
}
return err
}
xcount++
atomic.AddInt64(xcount, 1)
mx.Lock()
batchHot = append(batchHot, blk)
if len(batchHot) == batchSize {
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
mx.Unlock()
return err
}
batchHot = batchHot[:0]
}
mx.Unlock()
return nil
})
@ -118,9 +123,9 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
}
}
log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing)
log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)
s.markSetSize = count + count>>2 // overestimate a bit
s.markSetSize = *count + *count>>2 // overestimate a bit
err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)

View File

@ -1,6 +1,8 @@
package splitstore
import (
"sync"
cid "github.com/ipfs/go-cid"
)
@ -17,16 +19,34 @@ func (v *noopVisitor) Visit(_ cid.Cid) (bool, error) {
return true, nil
}
type cidSetVisitor struct {
type tmpVisitor struct {
set *cid.Set
}
var _ ObjectVisitor = (*cidSetVisitor)(nil)
var _ ObjectVisitor = (*tmpVisitor)(nil)
func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) {
func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) {
return v.set.Visit(c), nil
}
func tmpVisitor() ObjectVisitor {
return &cidSetVisitor{set: cid.NewSet()}
func newTmpVisitor() ObjectVisitor {
return &tmpVisitor{set: cid.NewSet()}
}
type concurrentVisitor struct {
mx sync.Mutex
set *cid.Set
}
var _ ObjectVisitor = (*concurrentVisitor)(nil)
func newConcurrentVisitor() *concurrentVisitor {
return &concurrentVisitor{set: cid.NewSet()}
}
func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) {
v.mx.Lock()
defer v.mx.Unlock()
return v.set.Visit(c), nil
}