diff --git a/chain/store/splitstore/liveset.go b/chain/store/splitstore/liveset.go index 6b64a3a56..62a5d6913 100644 --- a/chain/store/splitstore/liveset.go +++ b/chain/store/splitstore/liveset.go @@ -17,7 +17,7 @@ type LiveSet interface { var markBytes = []byte{} type LiveSetEnv interface { - NewLiveSet(name string) (LiveSet, error) + NewLiveSet(name string, sizeHint int64) (LiveSet, error) Close() error } diff --git a/chain/store/splitstore/liveset_bloom.go b/chain/store/splitstore/liveset_bloom.go index f4d28a5dd..9fc3503b3 100644 --- a/chain/store/splitstore/liveset_bloom.go +++ b/chain/store/splitstore/liveset_bloom.go @@ -30,14 +30,19 @@ func NewBloomLiveSetEnv() (*BloomLiveSetEnv, error) { return &BloomLiveSetEnv{}, nil } -func (e *BloomLiveSetEnv) NewLiveSet(name string) (LiveSet, error) { +func (e *BloomLiveSetEnv) NewLiveSet(name string, sizeHint int64) (LiveSet, error) { + size := int64(BloomFilterSize) + for size < sizeHint { + size += BloomFilterSize + } + salt := make([]byte, 4) _, err := rand.Read(salt) //nolint if err != nil { return nil, xerrors.Errorf("error reading salt: %w", err) } - bf, err := bbloom.New(float64(BloomFilterSize), float64(BloomFilterProbability)) + bf, err := bbloom.New(float64(size), float64(BloomFilterProbability)) if err != nil { return nil, xerrors.Errorf("error creating bloom filter: %w", err) } diff --git a/chain/store/splitstore/liveset_bolt.go b/chain/store/splitstore/liveset_bolt.go index 0d6d7cd2e..8c68d6a4a 100644 --- a/chain/store/splitstore/liveset_bolt.go +++ b/chain/store/splitstore/liveset_bolt.go @@ -35,7 +35,7 @@ func NewBoltLiveSetEnv(path string) (*BoltLiveSetEnv, error) { return &BoltLiveSetEnv{db: db}, nil } -func (e *BoltLiveSetEnv) NewLiveSet(name string) (LiveSet, error) { +func (e *BoltLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) { bucketId := []byte(name) err := e.db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists(bucketId) diff --git a/chain/store/splitstore/liveset_lmdb.go b/chain/store/splitstore/liveset_lmdb.go index f41907207..e8123f104 100644 --- a/chain/store/splitstore/liveset_lmdb.go +++ b/chain/store/splitstore/liveset_lmdb.go @@ -57,7 +57,7 @@ func NewLMDBLiveSetEnv(path string) (*LMDBLiveSetEnv, error) { return &LMDBLiveSetEnv{env: env}, nil } -func (e *LMDBLiveSetEnv) NewLiveSet(name string) (LiveSet, error) { +func (e *LMDBLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) { return NewLMDBLiveSet(e.env, name+".lmdb") } diff --git a/chain/store/splitstore/splitstore.go b/chain/store/splitstore/splitstore.go index 8001f792c..e20615e68 100644 --- a/chain/store/splitstore/splitstore.go +++ b/chain/store/splitstore/splitstore.go @@ -75,6 +75,8 @@ type SplitStore struct { snoop TrackingStore env LiveSetEnv + + liveSetSize int64 } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -328,6 +330,15 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { // Compaction/GC Algorithm func (s *SplitStore) compact() { + if s.liveSetSize == 0 { + start := time.Now() + log.Info("estimating live set size") + s.estimateLiveSetSize() + log.Infow("estimating live set size done", "took", time.Since(start), "size", s.liveSetSize) + } else { + log.Infow("current live set size estimate", "size", s.liveSetSize) + } + if s.fullCompaction { s.compactFull() } else { @@ -335,6 +346,24 @@ func (s *SplitStore) compact() { } } +func (s *SplitStore) estimateLiveSetSize() { + s.mx.Lock() + curTs := s.curTs + s.mx.Unlock() + + s.liveSetSize = 0 + err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + s.liveSetSize++ + return nil + }) + + if err != nil { + // TODO do something better here + panic(err) + } +} + func (s *SplitStore) compactSimple() { s.mx.Lock() curTs := s.curTs @@ -344,7 +373,7 @@ func (s *SplitStore) compactSimple() { log.Infow("running simple compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch) - coldSet, err := s.env.NewLiveSet("cold") + coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize) if err != nil { // TODO do something better here panic(err) @@ -361,8 +390,10 @@ func (s *SplitStore) compactSimple() { panic(err) } + count := int64(0) err = s.cs.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts, func(cid cid.Cid) error { + count++ return coldSet.Mark(cid) }) @@ -371,6 +402,10 @@ func (s *SplitStore) compactSimple() { panic(err) } + if count > s.liveSetSize { + s.liveSetSize = count + } + log.Infow("marking done", "took", time.Since(startMark)) // 2. move cold unreachable objects to the coldstore @@ -519,14 +554,14 @@ func (s *SplitStore) compactFull() { // create two live sets, one for marking the cold finality region // and one for marking the hot region - hotSet, err := s.env.NewLiveSet("hot") + hotSet, err := s.env.NewLiveSet("hot", s.liveSetSize) if err != nil { // TODO do something better here panic(err) } defer hotSet.Close() //nolint:errcheck - coldSet, err := s.env.NewLiveSet("cold") + coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize) if err != nil { // TODO do something better here panic(err) @@ -538,8 +573,10 @@ func (s *SplitStore) compactFull() { startMark := time.Now() // Phase 1a: mark all reachable CIDs in the hot range + count := int64(0) err = s.cs.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts, func(cid cid.Cid) error { + count++ return hotSet.Mark(cid) }) @@ -548,6 +585,10 @@ func (s *SplitStore) compactFull() { panic(err) } + if count > s.liveSetSize { + s.liveSetSize = count + } + // Phase 1b: mark all reachable CIDs in the cold range coldTs, err := s.cs.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true) if err != nil { @@ -555,8 +596,10 @@ func (s *SplitStore) compactFull() { panic(err) } + count = 0 err = s.cs.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts, func(cid cid.Cid) error { + count++ return coldSet.Mark(cid) }) @@ -565,6 +608,10 @@ func (s *SplitStore) compactFull() { panic(err) } + if count > s.liveSetSize { + s.liveSetSize = count + } + log.Infow("marking done", "took", time.Since(startMark)) // Phase 2: sweep cold objects: