From 1323d8fb204f0e722bda2d2018f34028a7f830d6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 30 Jul 2021 09:42:20 +0300 Subject: [PATCH] markset-backed visitors --- blockstore/splitstore/markset.go | 6 ++ blockstore/splitstore/markset_badger.go | 129 ++++++++++++++++++++++++ blockstore/splitstore/markset_bloom.go | 4 + blockstore/splitstore/markset_map.go | 31 ++++++ blockstore/splitstore/markset_test.go | 80 ++++++++++++++- 5 files changed, 249 insertions(+), 1 deletion(-) diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index 458ea8beb..b75be99c4 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -21,8 +21,14 @@ type MarkSet interface { SetConcurrent() } +type MarkSetVisitor interface { + ObjectVisitor + Close() error +} + type MarkSetEnv interface { Create(name string, sizeHint int64) (MarkSet, error) + CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) Close() error } diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index ef67db213..adc86007b 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -34,6 +34,15 @@ type BadgerMarkSet struct { var _ MarkSet = (*BadgerMarkSet)(nil) +type BadgerMarkSetVisitor struct { + pend map[string]struct{} + + db *badger.DB + path string +} + +var _ MarkSetVisitor = (*BadgerMarkSetVisitor)(nil) + var badgerMarkSetBatchSize = 16384 func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) { @@ -92,6 +101,50 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) return ms, nil } +func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { + path := filepath.Join(e.path, name) + + // clean up first + err := os.RemoveAll(path) + if err != nil { + return nil, xerrors.Errorf("error clearing markset directory: %w", err) + } + + err = os.MkdirAll(path, 0755) //nolint:gosec + if err != nil { + return nil, xerrors.Errorf("error creating markset directory: %w", err) + } + + opts := badger.DefaultOptions(path) + opts.SyncWrites = false + opts.CompactL0OnClose = false + opts.Compression = options.None + // Note: We use FileIO for loading modes to avoid memory thrashing and interference + // between the system blockstore and the markset. + // It was observed that using the default memory mapped option resulted in + // significant interference and unacceptably high block validation times once the markset + // exceeded 1GB in size. + opts.TableLoadingMode = options.FileIO + opts.ValueLogLoadingMode = options.FileIO + opts.Logger = &badgerLogger{ + SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), + skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + } + + db, err := badger.Open(opts) + if err != nil { + return nil, xerrors.Errorf("error creating badger markset: %w", err) + } + + v := &BadgerMarkSetVisitor{ + pend: make(map[string]struct{}), + db: db, + path: path, + } + + return v, nil +} + func (e *BadgerMarkSetEnv) Close() error { return os.RemoveAll(e.path) } @@ -219,6 +272,82 @@ func (s *BadgerMarkSet) Close() error { func (s *BadgerMarkSet) SetConcurrent() {} +func (v *BadgerMarkSetVisitor) Visit(c cid.Cid) (bool, error) { + if v.pend == nil { + return false, errMarkSetClosed + } + + key := c.Hash() + pendKey := string(key) + _, ok := v.pend[pendKey] + if ok { + return false, nil + } + + err := v.db.View(func(txn *badger.Txn) error { + _, err := txn.Get(key) + return err + }) + + switch err { + case nil: + return false, nil + + case badger.ErrKeyNotFound: + v.pend[pendKey] = struct{}{} + + if len(v.pend) < badgerMarkSetBatchSize { + return true, nil + } + + pend := v.pend + v.pend = make(map[string]struct{}) + + empty := []byte{} // not nil + + batch := v.db.NewWriteBatch() + defer batch.Cancel() + + for k := range pend { + if err := batch.Set([]byte(k), empty); err != nil { + return false, err + } + } + + err := batch.Flush() + if err != nil { + return false, xerrors.Errorf("error flushing batch to badger markset: %w", err) + } + + return true, nil + + default: + return false, xerrors.Errorf("error checking badger markset: %w", err) + } +} + +func (v *BadgerMarkSetVisitor) Close() error { + if v.pend == nil { + return nil + } + + v.pend = nil + db := v.db + v.db = nil + + err := db.Close() + if err != nil { + return xerrors.Errorf("error closing badger markset: %w", err) + } + + err = os.RemoveAll(v.path) + if err != nil { + return xerrors.Errorf("error deleting badger markset: %w", err) + } + + return nil +} + // badger logging through go-log type badgerLogger struct { *zap.SugaredLogger diff --git a/blockstore/splitstore/markset_bloom.go b/blockstore/splitstore/markset_bloom.go index 9261de7c7..745260633 100644 --- a/blockstore/splitstore/markset_bloom.go +++ b/blockstore/splitstore/markset_bloom.go @@ -53,6 +53,10 @@ func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { return &BloomMarkSet{salt: salt, bf: bf}, nil } +func (e *BloomMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { + return nil, xerrors.Errorf("bloom mark set does not support visitors due to false positives") +} + func (e *BloomMarkSetEnv) Close() error { return nil } diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index 197c82424..8ec7b428b 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -19,6 +19,12 @@ type MapMarkSet struct { var _ MarkSet = (*MapMarkSet)(nil) +type MapMarkSetVisitor struct { + set map[string]struct{} +} + +var _ MarkSetVisitor = (*MapMarkSetVisitor)(nil) + func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { return &MapMarkSetEnv{}, nil } @@ -29,6 +35,12 @@ func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { }, nil } +func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { + return &MapMarkSetVisitor{ + set: make(map[string]struct{}, sizeHint), + }, nil +} + func (e *MapMarkSetEnv) Close() error { return nil } @@ -73,3 +85,22 @@ func (s *MapMarkSet) Close() error { func (s *MapMarkSet) SetConcurrent() { s.ts = true } + +func (v *MapMarkSetVisitor) Visit(c cid.Cid) (bool, error) { + if v.set == nil { + return false, errMarkSetClosed + } + + key := string(c.Hash()) + if _, ok := v.set[key]; ok { + return false, nil + } + + v.set[key] = struct{}{} + return true, nil +} + +func (v *MapMarkSetVisitor) Close() error { + v.set = nil + return nil +} diff --git a/blockstore/splitstore/markset_test.go b/blockstore/splitstore/markset_test.go index 38519949a..93490c4c1 100644 --- a/blockstore/splitstore/markset_test.go +++ b/blockstore/splitstore/markset_test.go @@ -2,6 +2,7 @@ package splitstore import ( "io/ioutil" + "os" "testing" cid "github.com/ipfs/go-cid" @@ -10,6 +11,7 @@ import ( func TestMapMarkSet(t *testing.T) { testMarkSet(t, "map") + testMarkSetVisitor(t, "map") } func TestBloomMarkSet(t *testing.T) { @@ -23,16 +25,21 @@ func TestBadgerMarkSet(t *testing.T) { badgerMarkSetBatchSize = bs }) testMarkSet(t, "badger") + testMarkSetVisitor(t, "badger") } func testMarkSet(t *testing.T, lsType string) { t.Helper() - path, err := ioutil.TempDir("", "sweep-test.*") + path, err := ioutil.TempDir("", "markset.*") if err != nil { t.Fatal(err) } + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + env, err := OpenMarkSetEnv(path, lsType) if err != nil { t.Fatal(err) @@ -145,3 +152,74 @@ func testMarkSet(t *testing.T, lsType string) { t.Fatal(err) } } + +func testMarkSetVisitor(t *testing.T, lsType string) { + t.Helper() + + path, err := ioutil.TempDir("", "markset.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + env, err := OpenMarkSetEnv(path, lsType) + if err != nil { + t.Fatal(err) + } + defer env.Close() //nolint:errcheck + + visitor, err := env.CreateVisitor("test", 0) + if err != nil { + t.Fatal(err) + } + defer visitor.Close() //nolint:errcheck + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustVisit := func(v ObjectVisitor, cid cid.Cid) { + visit, err := v.Visit(cid) + if err != nil { + t.Fatal(err) + } + + if !visit { + t.Fatal("object should be visited") + } + } + + mustNotVisit := func(v ObjectVisitor, cid cid.Cid) { + visit, err := v.Visit(cid) + if err != nil { + t.Fatal(err) + } + + if visit { + t.Fatal("unexpected visit") + } + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + mustVisit(visitor, k1) + mustVisit(visitor, k2) + mustVisit(visitor, k3) + mustVisit(visitor, k4) + + mustNotVisit(visitor, k1) + mustNotVisit(visitor, k2) + mustNotVisit(visitor, k3) + mustNotVisit(visitor, k4) +}