Merge pull request #6949 from filecoin-project/fix/splitstore-memory-usage

Reduce splitstore memory usage during chain walks
This commit is contained in:
vyzo 2021-08-10 11:05:41 +03:00 committed by GitHub
commit c24d4e11d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 472 additions and 295 deletions

View File

@ -21,15 +21,26 @@ type MarkSet interface {
SetConcurrent() SetConcurrent()
} }
type MarkSetVisitor interface {
MarkSet
ObjectVisitor
}
type MarkSetEnv interface { type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error) Create(name string, sizeHint int64) (MarkSet, error)
// CreateVisitor is like Create, but returns a wider interface that supports atomic visits.
// It may not be supported by some markset types (e.g. bloom).
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
// SupportsVisitor returns true if the marksets created by this environment support the visitor interface.
SupportsVisitor() bool
Close() error Close() error
} }
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype { switch mtype {
case "bloom":
return NewBloomMarkSetEnv()
case "map": case "map":
return NewMapMarkSetEnv() return NewMapMarkSetEnv()
case "badger": case "badger":

View File

@ -27,12 +27,14 @@ type BadgerMarkSet struct {
writing map[int]map[string]struct{} writing map[int]map[string]struct{}
writers int writers int
seqno int seqno int
version int
db *badger.DB db *badger.DB
path string path string
} }
var _ MarkSet = (*BadgerMarkSet)(nil) var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)
var badgerMarkSetBatchSize = 16384 var badgerMarkSetBatchSize = 16384
@ -46,9 +48,241 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil return &BadgerMarkSetEnv{path: msPath}, nil
} }
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name) path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
}
ms.cond.L = &ms.mx
return ms, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) SupportsVisitor() bool { return true }
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
write, seqno := s.put(string(c.Hash()))
s.mx.Unlock()
if write {
return s.write(seqno)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()
key := c.Hash()
pendKey := string(key)
has, err := s.tryPending(pendKey)
if has || err != nil {
return has, err
}
return s.tryDB(key)
}
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
key := c.Hash()
pendKey := string(key)
s.mx.RLock()
has, err := s.tryPending(pendKey)
if has || err != nil {
s.mx.RUnlock()
return false, err
}
has, err = s.tryDB(key)
if has || err != nil {
s.mx.RUnlock()
return false, err
}
// we need to upgrade the lock to exclusive in order to write; take the version count to see
// if there was another write while we were upgrading
version := s.version
s.mx.RUnlock()
s.mx.Lock()
// we have to do the check dance again
has, err = s.tryPending(pendKey)
if has || err != nil {
s.mx.Unlock()
return false, err
}
if version != s.version {
// something was written to the db, we need to check it
has, err = s.tryDB(key)
if has || err != nil {
s.mx.Unlock()
return false, err
}
}
write, seqno := s.put(pendKey)
s.mx.Unlock()
if write {
err = s.write(seqno)
}
return true, err
}
// reader holds the (r)lock
func (s *BadgerMarkSet) tryPending(key string) (has bool, err error) {
if s.pend == nil {
return false, errMarkSetClosed
}
if _, ok := s.pend[key]; ok {
return true, nil
}
for _, wr := range s.writing {
if _, ok := wr[key]; ok {
return true, nil
}
}
return false, nil
}
func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
err = s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return true, nil
case badger.ErrKeyNotFound:
return false, nil
default:
return false, err
}
}
// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
seqno = s.seqno
s.seqno++
s.writing[seqno] = s.pend
s.pend = make(map[string]struct{})
return true, seqno
}
func (s *BadgerMarkSet) write(seqno int) (err error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
pend := s.writing[seqno]
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
if err == nil {
delete(s.writing, seqno)
s.version++
}
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err = batch.Set([]byte(k), empty); err != nil {
return xerrors.Errorf("error setting batch: %w", err)
}
}
err = batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
}
func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.pend == nil {
return nil
}
for s.writers > 0 {
s.cond.Wait()
}
s.pend = nil
db := s.db
s.db = nil
return closeTransientBadgerDB(db, s.path)
}
func (s *BadgerMarkSet) SetConcurrent() {}
func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first // clean up first
err := os.RemoveAll(path) err := os.RemoveAll(path)
if err != nil { if err != nil {
@ -76,140 +310,16 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
} }
db, err := badger.Open(opts) return badger.Open(opts)
if err != nil {
return nil, xerrors.Errorf("error creating badger markset: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
}
ms.cond.L = &ms.mx
return ms, nil
} }
func (e *BadgerMarkSetEnv) Close() error { func closeTransientBadgerDB(db *badger.DB, path string) error {
return os.RemoveAll(e.path)
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
s.pend[string(c.Hash())] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return nil
}
pend := s.pend
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock()
defer s.mx.RUnlock()
if s.pend == nil {
return false, errMarkSetClosed
}
key := c.Hash()
pendKey := string(key)
_, ok := s.pend[pendKey]
if ok {
return true, nil
}
for _, wr := range s.writing {
_, ok := wr[pendKey]
if ok {
return true, nil
}
}
err := s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return true, nil
case badger.ErrKeyNotFound:
return false, nil
default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
func (s *BadgerMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.pend == nil {
return nil
}
for s.writers > 0 {
s.cond.Wait()
}
s.pend = nil
db := s.db
s.db = nil
err := db.Close() err := db.Close()
if err != nil { if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err) return xerrors.Errorf("error closing badger markset: %w", err)
} }
err = os.RemoveAll(s.path) err = os.RemoveAll(path)
if err != nil { if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err) return xerrors.Errorf("error deleting badger markset: %w", err)
} }
@ -217,8 +327,6 @@ func (s *BadgerMarkSet) Close() error {
return nil return nil
} }
func (s *BadgerMarkSet) SetConcurrent() {}
// badger logging through go-log // badger logging through go-log
type badgerLogger struct { type badgerLogger struct {
*zap.SugaredLogger *zap.SugaredLogger

View File

@ -1,107 +0,0 @@
package splitstore
import (
"crypto/rand"
"crypto/sha256"
"sync"
"golang.org/x/xerrors"
bbloom "github.com/ipfs/bbloom"
cid "github.com/ipfs/go-cid"
)
const (
BloomFilterMinSize = 10_000_000
BloomFilterProbability = 0.01
)
type BloomMarkSetEnv struct{}
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
type BloomMarkSet struct {
salt []byte
mx sync.RWMutex
bf *bbloom.Bloom
ts bool
}
var _ MarkSet = (*BloomMarkSet)(nil)
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
return &BloomMarkSetEnv{}, nil
}
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
size := int64(BloomFilterMinSize)
for size < sizeHint {
size += BloomFilterMinSize
}
salt := make([]byte, 4)
_, err := rand.Read(salt)
if err != nil {
return nil, xerrors.Errorf("error reading salt: %w", err)
}
bf, err := bbloom.New(float64(size), BloomFilterProbability)
if err != nil {
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
}
return &BloomMarkSet{salt: salt, bf: bf}, nil
}
func (e *BloomMarkSetEnv) Close() error {
return nil
}
func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
hash := cid.Hash()
key := make([]byte, len(s.salt)+len(hash))
n := copy(key, s.salt)
copy(key[n:], hash)
rehash := sha256.Sum256(key)
return rehash[:]
}
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.bf == nil {
return errMarkSetClosed
}
s.bf.Add(s.saltedKey(cid))
return nil
}
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
if s.bf == nil {
return false, errMarkSetClosed
}
return s.bf.Has(s.saltedKey(cid)), nil
}
func (s *BloomMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.bf = nil
return nil
}
func (s *BloomMarkSet) SetConcurrent() {
s.ts = true
}

View File

@ -18,17 +18,28 @@ type MapMarkSet struct {
} }
var _ MarkSet = (*MapMarkSet)(nil) var _ MarkSet = (*MapMarkSet)(nil)
var _ MarkSetVisitor = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil return &MapMarkSetEnv{}, nil
} }
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) {
return &MapMarkSet{ return &MapMarkSet{
set: make(map[string]struct{}, sizeHint), set: make(map[string]struct{}, sizeHint),
}, nil }, nil
} }
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) SupportsVisitor() bool { return true }
func (e *MapMarkSetEnv) Close() error { func (e *MapMarkSetEnv) Close() error {
return nil return nil
} }
@ -61,6 +72,25 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
return ok, nil return ok, nil
} }
func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.set == nil {
return false, errMarkSetClosed
}
key := string(c.Hash())
if _, ok := s.set[key]; ok {
return false, nil
}
s.set[key] = struct{}{}
return true, nil
}
func (s *MapMarkSet) Close() error { func (s *MapMarkSet) Close() error {
if s.ts { if s.ts {
s.mx.Lock() s.mx.Lock()

View File

@ -2,6 +2,7 @@ package splitstore
import ( import (
"io/ioutil" "io/ioutil"
"os"
"testing" "testing"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
@ -10,10 +11,7 @@ import (
func TestMapMarkSet(t *testing.T) { func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "map") testMarkSet(t, "map")
} testMarkSetVisitor(t, "map")
func TestBloomMarkSet(t *testing.T) {
testMarkSet(t, "bloom")
} }
func TestBadgerMarkSet(t *testing.T) { func TestBadgerMarkSet(t *testing.T) {
@ -23,16 +21,21 @@ func TestBadgerMarkSet(t *testing.T) {
badgerMarkSetBatchSize = bs badgerMarkSetBatchSize = bs
}) })
testMarkSet(t, "badger") testMarkSet(t, "badger")
testMarkSetVisitor(t, "badger")
} }
func testMarkSet(t *testing.T, lsType string) { func testMarkSet(t *testing.T, lsType string) {
t.Helper() t.Helper()
path, err := ioutil.TempDir("", "sweep-test.*") path, err := ioutil.TempDir("", "markset.*")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType) env, err := OpenMarkSetEnv(path, lsType)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -145,3 +148,74 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err) t.Fatal(err)
} }
} }
func testMarkSetVisitor(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
visitor, err := env.CreateVisitor("test", 0)
if err != nil {
t.Fatal(err)
}
defer visitor.Close() //nolint:errcheck
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if !visit {
t.Fatal("object should be visited")
}
}
mustNotVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if visit {
t.Fatal("unexpected visit")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
mustVisit(visitor, k1)
mustVisit(visitor, k2)
mustVisit(visitor, k3)
mustVisit(visitor, k4)
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
}

View File

@ -142,7 +142,6 @@ type SplitStore struct {
txnViews int txnViews int
txnViewsWaiting bool txnViewsWaiting bool
txnActive bool txnActive bool
txnProtect MarkSet
txnRefsMx sync.Mutex txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{} txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{}
@ -174,6 +173,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
return nil, err return nil, err
} }
if !markSetEnv.SupportsVisitor() {
return nil, xerrors.Errorf("markset type does not support atomic visitors")
}
// and now we can make a SplitStore // and now we can make a SplitStore
ss := &SplitStore{ ss := &SplitStore{
cfg: cfg, cfg: cfg,

View File

@ -83,7 +83,14 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
write("--") write("--")
var coldCnt, missingCnt int64 var coldCnt, missingCnt int64
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch,
visitor, err := s.markSetEnv.CreateVisitor("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk

View File

@ -211,7 +211,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
} }
// protect all pending transactional references // protect all pending transactional references
func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { func (s *SplitStore) protectTxnRefs(markSet MarkSetVisitor) error {
for { for {
var txnRefs map[cid.Cid]struct{} var txnRefs map[cid.Cid]struct{}
@ -283,30 +283,29 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
// transactionally protect a reference by walking the object and marking. // transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset. // concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSetVisitor) error {
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
return err return err
} }
// Note: cold objects are deleted heaviest first, so the consituents of an object // Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself. // cannot be deleted before the object itself.
return s.walkObjectIncomplete(root, cid.NewSet(), return s.walkObjectIncomplete(root, tmpVisitor(),
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk
} }
mark, err := markSet.Has(c) visit, err := markSet.Visit(c)
if err != nil { if err != nil {
return xerrors.Errorf("error checking markset: %w", err) return xerrors.Errorf("error visiting object: %w", err)
} }
// it's marked, nothing to do if !visit {
if mark {
return errStopWalk return errStopWalk
} }
return markSet.Mark(c) return nil
}, },
func(c cid.Cid) error { func(c cid.Cid) error {
if s.txnMissing != nil { if s.txnMissing != nil {
@ -382,7 +381,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex) log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize) markSet, err := s.markSetEnv.CreateVisitor("live", s.markSetSize)
if err != nil { if err != nil {
return xerrors.Errorf("error creating mark set: %w", err) return xerrors.Errorf("error creating mark set: %w", err)
} }
@ -410,14 +409,23 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
startMark := time.Now() startMark := time.Now()
var count int64 var count int64
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk
} }
visit, err := markSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return errStopWalk
}
count++ count++
return markSet.Mark(c) return nil
}) })
if err != nil { if err != nil {
@ -578,12 +586,8 @@ func (s *SplitStore) beginTxnProtect() {
s.txnMissing = make(map[cid.Cid]struct{}) s.txnMissing = make(map[cid.Cid]struct{})
} }
func (s *SplitStore) beginTxnMarking(markSet MarkSet) { func (s *SplitStore) beginTxnMarking(markSet MarkSetVisitor) {
markSet.SetConcurrent() markSet.SetConcurrent()
s.txnLk.Lock()
s.txnProtect = markSet
s.txnLk.Unlock()
} }
func (s *SplitStore) endTxnProtect() { func (s *SplitStore) endTxnProtect() {
@ -594,21 +598,14 @@ func (s *SplitStore) endTxnProtect() {
return return
} }
// release markset memory
if s.txnProtect != nil {
_ = s.txnProtect.Close()
}
s.txnActive = false s.txnActive = false
s.txnProtect = nil
s.txnRefs = nil s.txnRefs = nil
s.txnMissing = nil s.txnMissing = nil
} }
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
f func(cid.Cid) error) error { visitor ObjectVisitor, f func(cid.Cid) error) error {
visited := cid.NewSet() var walked *cid.Set
walked := cid.NewSet()
toWalk := ts.Cids() toWalk := ts.Cids()
walkCnt := 0 walkCnt := 0
scanCnt := 0 scanCnt := 0
@ -616,7 +613,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
stopWalk := func(_ cid.Cid) error { return errStopWalk } stopWalk := func(_ cid.Cid) error { return errStopWalk }
walkBlock := func(c cid.Cid) error { walkBlock := func(c cid.Cid) error {
if !visited.Visit(c) { if !walked.Visit(c) {
return nil return nil
} }
@ -640,19 +637,19 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if inclMsgs < inclState { if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we // we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy. // synced from snapshot and have a long HotStoreMessageRetentionPolicy.
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil { if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
} }
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, walked, f, stopWalk); err != nil { if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
} }
} else { } else {
if err := s.walkObject(hdr.Messages, walked, f); err != nil { if err := s.walkObject(hdr.Messages, visitor, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
} }
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil { if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
} }
} }
@ -660,7 +657,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
// state is only retained if within the inclState boundary, with the exception of genesis // state is only retained if within the inclState boundary, with the exception of genesis
if hdr.Height >= inclState || hdr.Height == 0 { if hdr.Height >= inclState || hdr.Height == 0 {
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil { if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
} }
scanCnt++ scanCnt++
@ -679,6 +676,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return err return err
} }
// the walk is BFS, so we can reset the walked set in every iteration and avoid building up
// a set that contains all blocks (1M epochs -> 5M blocks -> 200MB worth of memory and growing
// over time)
walked = cid.NewSet()
walking := toWalk walking := toWalk
toWalk = nil toWalk = nil
for _, c := range walking { for _, c := range walking {
@ -693,8 +694,13 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
return nil return nil
} }
func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error { func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error {
if !walked.Visit(c) { visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil return nil
} }
@ -716,7 +722,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
} }
var links []cid.Cid var links []cid.Cid
err := s.view(c, func(data []byte) error { err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c) links = append(links, c)
}) })
@ -727,7 +733,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
} }
for _, c := range links { for _, c := range links {
err := s.walkObject(c, walked, f) err := s.walkObject(c, visitor, f)
if err != nil { if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err) return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
} }
@ -737,8 +743,13 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
} }
// like walkObject, but the object may be potentially incomplete (references missing) // like walkObject, but the object may be potentially incomplete (references missing)
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error { func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error {
if !walked.Visit(c) { visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil return nil
} }
@ -777,7 +788,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
} }
var links []cid.Cid var links []cid.Cid
err := s.view(c, func(data []byte) error { err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c) links = append(links, c)
}) })
@ -788,7 +799,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
} }
for _, c := range links { for _, c := range links {
err := s.walkObjectIncomplete(c, walked, f, missing) err := s.walkObjectIncomplete(c, visitor, f, missing)
if err != nil { if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err) return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
} }
@ -984,7 +995,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
return nil return nil
} }
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSetVisitor) error {
deadCids := make([]cid.Cid, 0, batchSize) deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int var purgeCnt, liveCnt int
defer func() { defer func() {
@ -1050,7 +1061,7 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
// have this gem[TM]. // have this gem[TM].
// My best guess is that they are parent message receipts or yet to be computed state roots; magik // My best guess is that they are parent message receipts or yet to be computed state roots; magik
// thinks the cause may be block validation. // thinks the cause may be block validation.
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { func (s *SplitStore) waitForMissingRefs(markSet MarkSetVisitor) {
s.txnLk.Lock() s.txnLk.Lock()
missing := s.txnMissing missing := s.txnMissing
s.txnMissing = nil s.txnMissing = nil
@ -1079,27 +1090,27 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
} }
towalk := missing towalk := missing
walked := cid.NewSet() visitor := tmpVisitor()
missing = make(map[cid.Cid]struct{}) missing = make(map[cid.Cid]struct{})
for c := range towalk { for c := range towalk {
err := s.walkObjectIncomplete(c, walked, err := s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk
} }
mark, err := markSet.Has(c) visit, err := markSet.Visit(c)
if err != nil { if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err) return xerrors.Errorf("error visiting object: %w", err)
} }
if mark { if !visit {
return errStopWalk return errStopWalk
} }
count++ count++
return markSet.Mark(c) return nil
}, },
func(c cid.Cid) error { func(c cid.Cid) error {
missing[c] = struct{}{} missing[c] = struct{}{}

View File

@ -59,7 +59,15 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
count := int64(0) count := int64(0)
xcount := int64(0) xcount := int64(0)
missing := int64(0) missing := int64(0)
err := s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor, err := s.markSetEnv.CreateVisitor("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor,
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
return errStopWalk return errStopWalk

View File

@ -0,0 +1,32 @@
package splitstore
import (
cid "github.com/ipfs/go-cid"
)
// ObjectVisitor is an interface for deduplicating objects during walks
type ObjectVisitor interface {
Visit(cid.Cid) (bool, error)
}
type noopVisitor struct{}
var _ ObjectVisitor = (*noopVisitor)(nil)
func (v *noopVisitor) Visit(_ cid.Cid) (bool, error) {
return true, nil
}
type cidSetVisitor struct {
set *cid.Set
}
var _ ObjectVisitor = (*cidSetVisitor)(nil)
func (v *cidSetVisitor) Visit(c cid.Cid) (bool, error) {
return v.set.Visit(c), nil
}
func tmpVisitor() ObjectVisitor {
return &cidSetVisitor{set: cid.NewSet()}
}