dynamically size bloom filters

This commit is contained in:
vyzo 2021-02-28 21:35:18 +02:00
parent 5639261e44
commit cae5ddce88
5 changed files with 60 additions and 8 deletions

View File

@ -17,7 +17,7 @@ type LiveSet interface {
var markBytes = []byte{}
type LiveSetEnv interface {
NewLiveSet(name string) (LiveSet, error)
NewLiveSet(name string, sizeHint int64) (LiveSet, error)
Close() error
}

View File

@ -30,14 +30,19 @@ func NewBloomLiveSetEnv() (*BloomLiveSetEnv, error) {
return &BloomLiveSetEnv{}, nil
}
func (e *BloomLiveSetEnv) NewLiveSet(name string) (LiveSet, error) {
func (e *BloomLiveSetEnv) NewLiveSet(name string, sizeHint int64) (LiveSet, error) {
size := int64(BloomFilterSize)
for size < sizeHint {
size += BloomFilterSize
}
salt := make([]byte, 4)
_, err := rand.Read(salt) //nolint
if err != nil {
return nil, xerrors.Errorf("error reading salt: %w", err)
}
bf, err := bbloom.New(float64(BloomFilterSize), float64(BloomFilterProbability))
bf, err := bbloom.New(float64(size), float64(BloomFilterProbability))
if err != nil {
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
}

View File

@ -35,7 +35,7 @@ func NewBoltLiveSetEnv(path string) (*BoltLiveSetEnv, error) {
return &BoltLiveSetEnv{db: db}, nil
}
func (e *BoltLiveSetEnv) NewLiveSet(name string) (LiveSet, error) {
func (e *BoltLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) {
bucketId := []byte(name)
err := e.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketId)

View File

@ -57,7 +57,7 @@ func NewLMDBLiveSetEnv(path string) (*LMDBLiveSetEnv, error) {
return &LMDBLiveSetEnv{env: env}, nil
}
func (e *LMDBLiveSetEnv) NewLiveSet(name string) (LiveSet, error) {
func (e *LMDBLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) {
return NewLMDBLiveSet(e.env, name+".lmdb")
}

View File

@ -75,6 +75,8 @@ type SplitStore struct {
snoop TrackingStore
env LiveSetEnv
liveSetSize int64
}
var _ bstore.Blockstore = (*SplitStore)(nil)
@ -328,6 +330,15 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
// Compaction/GC Algorithm
func (s *SplitStore) compact() {
if s.liveSetSize == 0 {
start := time.Now()
log.Info("estimating live set size")
s.estimateLiveSetSize()
log.Infow("estimating live set size done", "took", time.Since(start), "size", s.liveSetSize)
} else {
log.Infow("current live set size estimate", "size", s.liveSetSize)
}
if s.fullCompaction {
s.compactFull()
} else {
@ -335,6 +346,24 @@ func (s *SplitStore) compact() {
}
}
func (s *SplitStore) estimateLiveSetSize() {
s.mx.Lock()
curTs := s.curTs
s.mx.Unlock()
s.liveSetSize = 0
err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
s.liveSetSize++
return nil
})
if err != nil {
// TODO do something better here
panic(err)
}
}
func (s *SplitStore) compactSimple() {
s.mx.Lock()
curTs := s.curTs
@ -344,7 +373,7 @@ func (s *SplitStore) compactSimple() {
log.Infow("running simple compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
coldSet, err := s.env.NewLiveSet("cold")
coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize)
if err != nil {
// TODO do something better here
panic(err)
@ -361,8 +390,10 @@ func (s *SplitStore) compactSimple() {
panic(err)
}
count := int64(0)
err = s.cs.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)
})
@ -371,6 +402,10 @@ func (s *SplitStore) compactSimple() {
panic(err)
}
if count > s.liveSetSize {
s.liveSetSize = count
}
log.Infow("marking done", "took", time.Since(startMark))
// 2. move cold unreachable objects to the coldstore
@ -519,14 +554,14 @@ func (s *SplitStore) compactFull() {
// create two live sets, one for marking the cold finality region
// and one for marking the hot region
hotSet, err := s.env.NewLiveSet("hot")
hotSet, err := s.env.NewLiveSet("hot", s.liveSetSize)
if err != nil {
// TODO do something better here
panic(err)
}
defer hotSet.Close() //nolint:errcheck
coldSet, err := s.env.NewLiveSet("cold")
coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize)
if err != nil {
// TODO do something better here
panic(err)
@ -538,8 +573,10 @@ func (s *SplitStore) compactFull() {
startMark := time.Now()
// Phase 1a: mark all reachable CIDs in the hot range
count := int64(0)
err = s.cs.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return hotSet.Mark(cid)
})
@ -548,6 +585,10 @@ func (s *SplitStore) compactFull() {
panic(err)
}
if count > s.liveSetSize {
s.liveSetSize = count
}
// Phase 1b: mark all reachable CIDs in the cold range
coldTs, err := s.cs.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
if err != nil {
@ -555,8 +596,10 @@ func (s *SplitStore) compactFull() {
panic(err)
}
count = 0
err = s.cs.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)
})
@ -565,6 +608,10 @@ func (s *SplitStore) compactFull() {
panic(err)
}
if count > s.liveSetSize {
s.liveSetSize = count
}
log.Infow("marking done", "took", time.Since(startMark))
// Phase 2: sweep cold objects: