markset-backed visitors

This commit is contained in:
vyzo 2021-07-30 09:42:20 +03:00
parent 6f22cffb6b
commit 1323d8fb20
5 changed files with 249 additions and 1 deletions

View File

@ -21,8 +21,14 @@ type MarkSet interface {
SetConcurrent()
}
type MarkSetVisitor interface {
ObjectVisitor
Close() error
}
type MarkSetEnv interface {
Create(name string, sizeHint int64) (MarkSet, error)
CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error)
Close() error
}

View File

@ -34,6 +34,15 @@ type BadgerMarkSet struct {
var _ MarkSet = (*BadgerMarkSet)(nil)
type BadgerMarkSetVisitor struct {
pend map[string]struct{}
db *badger.DB
path string
}
var _ MarkSetVisitor = (*BadgerMarkSetVisitor)(nil)
var badgerMarkSetBatchSize = 16384
func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
@ -92,6 +101,50 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil
}
func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
path := filepath.Join(e.path, name)
// clean up first
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}
err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}
opts := badger.DefaultOptions(path)
opts.SyncWrites = false
opts.CompactL0OnClose = false
opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset.
// It was observed that using the default memory mapped option resulted in
// significant interference and unacceptably high block validation times once the markset
// exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO
opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
db, err := badger.Open(opts)
if err != nil {
return nil, xerrors.Errorf("error creating badger markset: %w", err)
}
v := &BadgerMarkSetVisitor{
pend: make(map[string]struct{}),
db: db,
path: path,
}
return v, nil
}
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
}
@ -219,6 +272,82 @@ func (s *BadgerMarkSet) Close() error {
func (s *BadgerMarkSet) SetConcurrent() {}
func (v *BadgerMarkSetVisitor) Visit(c cid.Cid) (bool, error) {
if v.pend == nil {
return false, errMarkSetClosed
}
key := c.Hash()
pendKey := string(key)
_, ok := v.pend[pendKey]
if ok {
return false, nil
}
err := v.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return false, nil
case badger.ErrKeyNotFound:
v.pend[pendKey] = struct{}{}
if len(v.pend) < badgerMarkSetBatchSize {
return true, nil
}
pend := v.pend
v.pend = make(map[string]struct{})
empty := []byte{} // not nil
batch := v.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return false, err
}
}
err := batch.Flush()
if err != nil {
return false, xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return true, nil
default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
func (v *BadgerMarkSetVisitor) Close() error {
if v.pend == nil {
return nil
}
v.pend = nil
db := v.db
v.db = nil
err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}
err = os.RemoveAll(v.path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)
}
return nil
}
// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger

View File

@ -53,6 +53,10 @@ func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return &BloomMarkSet{salt: salt, bf: bf}, nil
}
func (e *BloomMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return nil, xerrors.Errorf("bloom mark set does not support visitors due to false positives")
}
func (e *BloomMarkSetEnv) Close() error {
return nil
}

View File

@ -19,6 +19,12 @@ type MapMarkSet struct {
var _ MarkSet = (*MapMarkSet)(nil)
type MapMarkSetVisitor struct {
set map[string]struct{}
}
var _ MarkSetVisitor = (*MapMarkSetVisitor)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
}
@ -29,6 +35,12 @@ func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
}, nil
}
func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return &MapMarkSetVisitor{
set: make(map[string]struct{}, sizeHint),
}, nil
}
func (e *MapMarkSetEnv) Close() error {
return nil
}
@ -73,3 +85,22 @@ func (s *MapMarkSet) Close() error {
func (s *MapMarkSet) SetConcurrent() {
s.ts = true
}
func (v *MapMarkSetVisitor) Visit(c cid.Cid) (bool, error) {
if v.set == nil {
return false, errMarkSetClosed
}
key := string(c.Hash())
if _, ok := v.set[key]; ok {
return false, nil
}
v.set[key] = struct{}{}
return true, nil
}
func (v *MapMarkSetVisitor) Close() error {
v.set = nil
return nil
}

View File

@ -2,6 +2,7 @@ package splitstore
import (
"io/ioutil"
"os"
"testing"
cid "github.com/ipfs/go-cid"
@ -10,6 +11,7 @@ import (
func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "map")
testMarkSetVisitor(t, "map")
}
func TestBloomMarkSet(t *testing.T) {
@ -23,16 +25,21 @@ func TestBadgerMarkSet(t *testing.T) {
badgerMarkSetBatchSize = bs
})
testMarkSet(t, "badger")
testMarkSetVisitor(t, "badger")
}
func testMarkSet(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "sweep-test.*")
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
@ -145,3 +152,74 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err)
}
}
func testMarkSetVisitor(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
visitor, err := env.CreateVisitor("test", 0)
if err != nil {
t.Fatal(err)
}
defer visitor.Close() //nolint:errcheck
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if !visit {
t.Fatal("object should be visited")
}
}
mustNotVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if visit {
t.Fatal("unexpected visit")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
mustVisit(visitor, k1)
mustVisit(visitor, k2)
mustVisit(visitor, k3)
mustVisit(visitor, k4)
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
}