From cb3c53664ddc882aa65a1b79366ece3054db8ca9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 30 Jul 2021 22:07:45 +0300 Subject: [PATCH] unify marksets and visitors --- blockstore/splitstore/markset.go | 1 + blockstore/splitstore/markset_badger.go | 226 +++++++++++------------- blockstore/splitstore/markset_map.go | 55 +++--- 3 files changed, 127 insertions(+), 155 deletions(-) diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index b75be99c4..704a3793e 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -24,6 +24,7 @@ type MarkSet interface { type MarkSetVisitor interface { ObjectVisitor Close() error + SetConcurrent() } type MarkSetEnv interface { diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index 9bfe26038..da5596785 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -33,15 +33,7 @@ type BadgerMarkSet struct { } var _ MarkSet = (*BadgerMarkSet)(nil) - -type BadgerMarkSetVisitor struct { - pend map[string]struct{} - - db *badger.DB - path string -} - -var _ MarkSetVisitor = (*BadgerMarkSetVisitor)(nil) +var _ MarkSetVisitor = (*BadgerMarkSet)(nil) var badgerMarkSetBatchSize = 16384 @@ -55,7 +47,7 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) { return &BadgerMarkSetEnv{path: msPath}, nil } -func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { +func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) { path := filepath.Join(e.path, name) db, err := openTransientBadgerDB(path) @@ -74,21 +66,12 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) return ms, nil } +func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { + return e.create(name, sizeHint) +} + func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { - path := filepath.Join(e.path, name) - - db, err := openTransientBadgerDB(path) - if err != nil { - return nil, xerrors.Errorf("error creating badger db: %w", err) - } - - v := &BadgerMarkSetVisitor{ - pend: make(map[string]struct{}), - db: db, - path: path, - } - - return v, nil + return e.create(name, sizeHint) } func (e *BadgerMarkSetEnv) Close() error { @@ -110,42 +93,7 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error { return nil } - pend := s.pend - seqno := s.seqno - s.seqno++ - s.writing[seqno] = pend - s.pend = make(map[string]struct{}) - s.writers++ - s.mx.Unlock() - - defer func() { - s.mx.Lock() - defer s.mx.Unlock() - - delete(s.writing, seqno) - s.writers-- - if s.writers == 0 { - s.cond.Broadcast() - } - }() - - empty := []byte{} // not nil - - batch := s.db.NewWriteBatch() - defer batch.Cancel() - - for k := range pend { - if err := batch.Set([]byte(k), empty); err != nil { - return err - } - } - - err := batch.Flush() - if err != nil { - return xerrors.Errorf("error flushing batch to badger markset: %w", err) - } - - return nil + return s.putPending() } func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { @@ -187,6 +135,98 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { } } +func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { + s.mx.Lock() + + if s.pend == nil { + s.mx.Unlock() + return false, errMarkSetClosed + } + + key := c.Hash() + pendKey := string(key) + if _, ok := s.pend[pendKey]; ok { + s.mx.Unlock() + return false, nil + } + + for _, wr := range s.writing { + if _, ok := wr[pendKey]; ok { + s.mx.Unlock() + return false, nil + } + } + + err := s.db.View(func(txn *badger.Txn) error { + _, err := txn.Get(key) + return err + }) + + switch err { + case nil: + s.mx.Unlock() + return false, nil + + case badger.ErrKeyNotFound: + s.pend[pendKey] = struct{}{} + + if len(s.pend) < badgerMarkSetBatchSize { + s.mx.Unlock() + return true, nil + } + + if err := s.putPending(); err != nil { + return false, err + } + + return true, nil + + default: + s.mx.Unlock() + return false, xerrors.Errorf("error checking badger markset: %w", err) + } +} + +// writer holds the lock +func (s *BadgerMarkSet) putPending() error { + pend := s.pend + seqno := s.seqno + s.seqno++ + s.writing[seqno] = pend + s.pend = make(map[string]struct{}) + s.writers++ + s.mx.Unlock() + + defer func() { + s.mx.Lock() + defer s.mx.Unlock() + + delete(s.writing, seqno) + s.writers-- + if s.writers == 0 { + s.cond.Broadcast() + } + }() + + empty := []byte{} // not nil + + batch := s.db.NewWriteBatch() + defer batch.Cancel() + + for k := range pend { + if err := batch.Set([]byte(k), empty); err != nil { + return xerrors.Errorf("error setting batch: %w", err) + } + } + + err := batch.Flush() + if err != nil { + return xerrors.Errorf("error flushing batch to badger markset: %w", err) + } + + return nil +} + func (s *BadgerMarkSet) Close() error { s.mx.Lock() defer s.mx.Unlock() @@ -208,72 +248,6 @@ func (s *BadgerMarkSet) Close() error { func (s *BadgerMarkSet) SetConcurrent() {} -func (v *BadgerMarkSetVisitor) Visit(c cid.Cid) (bool, error) { - if v.pend == nil { - return false, errMarkSetClosed - } - - key := c.Hash() - pendKey := string(key) - _, ok := v.pend[pendKey] - if ok { - return false, nil - } - - err := v.db.View(func(txn *badger.Txn) error { - _, err := txn.Get(key) - return err - }) - - switch err { - case nil: - return false, nil - - case badger.ErrKeyNotFound: - v.pend[pendKey] = struct{}{} - - if len(v.pend) < badgerMarkSetBatchSize { - return true, nil - } - - pend := v.pend - v.pend = make(map[string]struct{}) - - empty := []byte{} // not nil - - batch := v.db.NewWriteBatch() - defer batch.Cancel() - - for k := range pend { - if err := batch.Set([]byte(k), empty); err != nil { - return false, err - } - } - - err := batch.Flush() - if err != nil { - return false, xerrors.Errorf("error flushing batch to badger markset: %w", err) - } - - return true, nil - - default: - return false, xerrors.Errorf("error checking badger markset: %w", err) - } -} - -func (v *BadgerMarkSetVisitor) Close() error { - if v.pend == nil { - return nil - } - - v.pend = nil - db := v.db - v.db = nil - - return closeTransientBadgerDB(db, v.path) -} - func openTransientBadgerDB(path string) (*badger.DB, error) { // clean up first err := os.RemoveAll(path) diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index 8ec7b428b..81c9f1620 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -18,27 +18,24 @@ type MapMarkSet struct { } var _ MarkSet = (*MapMarkSet)(nil) - -type MapMarkSetVisitor struct { - set map[string]struct{} -} - -var _ MarkSetVisitor = (*MapMarkSetVisitor)(nil) +var _ MarkSetVisitor = (*MapMarkSet)(nil) func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { return &MapMarkSetEnv{}, nil } -func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { +func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) { return &MapMarkSet{ set: make(map[string]struct{}, sizeHint), }, nil } +func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { + return e.create(name, sizeHint) +} + func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { - return &MapMarkSetVisitor{ - set: make(map[string]struct{}, sizeHint), - }, nil + return e.create(name, sizeHint) } func (e *MapMarkSetEnv) Close() error { @@ -73,6 +70,25 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) { return ok, nil } +func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) { + if s.ts { + s.mx.Lock() + defer s.mx.Unlock() + } + + if s.set == nil { + return false, errMarkSetClosed + } + + key := string(c.Hash()) + if _, ok := s.set[key]; ok { + return false, nil + } + + s.set[key] = struct{}{} + return true, nil +} + func (s *MapMarkSet) Close() error { if s.ts { s.mx.Lock() @@ -85,22 +101,3 @@ func (s *MapMarkSet) Close() error { func (s *MapMarkSet) SetConcurrent() { s.ts = true } - -func (v *MapMarkSetVisitor) Visit(c cid.Cid) (bool, error) { - if v.set == nil { - return false, errMarkSetClosed - } - - key := string(c.Hash()) - if _, ok := v.set[key]; ok { - return false, nil - } - - v.set[key] = struct{}{} - return true, nil -} - -func (v *MapMarkSetVisitor) Close() error { - v.set = nil - return nil -}