lotus/blockstore/splitstore/markset_bloom.go

108 lines
1.8 KiB
Go
Raw Normal View History

2021-02-28 07:59:11 +00:00
package splitstore
import (
2021-03-01 16:50:09 +00:00
"crypto/rand"
2021-03-02 08:48:59 +00:00
"crypto/sha256"
2021-07-07 13:46:14 +00:00
"sync"
2021-02-28 07:59:11 +00:00
"golang.org/x/xerrors"
bbloom "github.com/ipfs/bbloom"
cid "github.com/ipfs/go-cid"
)
const (
2021-02-28 19:40:26 +00:00
BloomFilterMinSize = 10_000_000
BloomFilterProbability = 0.01
)
type BloomMarkSetEnv struct{}
2021-02-28 07:59:11 +00:00
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
2021-02-28 07:59:11 +00:00
type BloomMarkSet struct {
2021-02-28 07:59:11 +00:00
salt []byte
2021-07-08 07:20:29 +00:00
mx sync.RWMutex
2021-02-28 07:59:11 +00:00
bf *bbloom.Bloom
ts bool
2021-02-28 07:59:11 +00:00
}
var _ MarkSet = (*BloomMarkSet)(nil)
2021-02-28 07:59:11 +00:00
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
return &BloomMarkSetEnv{}, nil
2021-02-28 07:59:11 +00:00
}
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
2021-02-28 19:40:26 +00:00
size := int64(BloomFilterMinSize)
2021-02-28 19:35:18 +00:00
for size < sizeHint {
2021-02-28 19:40:26 +00:00
size += BloomFilterMinSize
2021-02-28 19:35:18 +00:00
}
2021-02-28 07:59:11 +00:00
salt := make([]byte, 4)
2021-03-01 16:50:09 +00:00
_, err := rand.Read(salt)
2021-02-28 07:59:11 +00:00
if err != nil {
return nil, xerrors.Errorf("error reading salt: %w", err)
}
2021-03-01 17:39:00 +00:00
bf, err := bbloom.New(float64(size), BloomFilterProbability)
2021-02-28 07:59:11 +00:00
if err != nil {
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
}
return &BloomMarkSet{salt: salt, bf: bf}, nil
2021-02-28 07:59:11 +00:00
}
func (e *BloomMarkSetEnv) Close() error {
2021-02-28 07:59:11 +00:00
return nil
}
func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
2021-02-28 07:59:11 +00:00
hash := cid.Hash()
key := make([]byte, len(s.salt)+len(hash))
n := copy(key, s.salt)
copy(key[n:], hash)
2021-03-02 08:48:59 +00:00
rehash := sha256.Sum256(key)
2021-02-28 08:39:25 +00:00
return rehash[:]
2021-02-28 07:59:11 +00:00
}
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
if s.ts {
2021-07-07 13:46:14 +00:00
s.mx.Lock()
defer s.mx.Unlock()
}
if s.bf == nil {
return errMarkSetClosed
}
2021-07-07 13:46:14 +00:00
s.bf.Add(s.saltedKey(cid))
2021-02-28 07:59:11 +00:00
return nil
}
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
2021-07-08 07:20:29 +00:00
s.mx.RLock()
defer s.mx.RUnlock()
}
2021-07-07 13:46:14 +00:00
if s.bf == nil {
return false, errMarkSetClosed
}
2021-07-07 13:46:14 +00:00
return s.bf.Has(s.saltedKey(cid)), nil
2021-02-28 07:59:11 +00:00
}
func (s *BloomMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.bf = nil
2021-02-28 07:59:11 +00:00
return nil
}
func (s *BloomMarkSet) SetConcurrent() {
s.ts = true
}