unify marksets and visitors

This commit is contained in:
vyzo 2021-07-30 22:07:45 +03:00
parent 3c994d94aa
commit cb3c53664d
3 changed files with 127 additions and 155 deletions

View File

@ -24,6 +24,7 @@ type MarkSet interface {
type MarkSetVisitor interface { type MarkSetVisitor interface {
ObjectVisitor ObjectVisitor
Close() error Close() error
SetConcurrent()
} }
type MarkSetEnv interface { type MarkSetEnv interface {

View File

@ -33,15 +33,7 @@ type BadgerMarkSet struct {
} }
var _ MarkSet = (*BadgerMarkSet)(nil) var _ MarkSet = (*BadgerMarkSet)(nil)
var _ MarkSetVisitor = (*BadgerMarkSet)(nil)
type BadgerMarkSetVisitor struct {
pend map[string]struct{}
db *badger.DB
path string
}
var _ MarkSetVisitor = (*BadgerMarkSetVisitor)(nil)
var badgerMarkSetBatchSize = 16384 var badgerMarkSetBatchSize = 16384
@ -55,7 +47,7 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil return &BadgerMarkSetEnv{path: msPath}, nil
} }
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { func (e *BadgerMarkSetEnv) create(name string, sizeHint int64) (*BadgerMarkSet, error) {
path := filepath.Join(e.path, name) path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path) db, err := openTransientBadgerDB(path)
@ -74,21 +66,12 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil return ms, nil
} }
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { func (e *BadgerMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
path := filepath.Join(e.path, name) return e.create(name, sizeHint)
db, err := openTransientBadgerDB(path)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
v := &BadgerMarkSetVisitor{
pend: make(map[string]struct{}),
db: db,
path: path,
}
return v, nil
} }
func (e *BadgerMarkSetEnv) Close() error { func (e *BadgerMarkSetEnv) Close() error {
@ -110,42 +93,7 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error {
return nil return nil
} }
pend := s.pend return s.putPending()
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return err
}
}
err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
} }
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
@ -187,6 +135,98 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
} }
} }
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return false, errMarkSetClosed
}
key := c.Hash()
pendKey := string(key)
if _, ok := s.pend[pendKey]; ok {
s.mx.Unlock()
return false, nil
}
for _, wr := range s.writing {
if _, ok := wr[pendKey]; ok {
s.mx.Unlock()
return false, nil
}
}
err := s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
s.mx.Unlock()
return false, nil
case badger.ErrKeyNotFound:
s.pend[pendKey] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock()
return true, nil
}
if err := s.putPending(); err != nil {
return false, err
}
return true, nil
default:
s.mx.Unlock()
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
// writer holds the lock
func (s *BadgerMarkSet) putPending() error {
pend := s.pend
seqno := s.seqno
s.seqno++
s.writing[seqno] = pend
s.pend = make(map[string]struct{})
s.writers++
s.mx.Unlock()
defer func() {
s.mx.Lock()
defer s.mx.Unlock()
delete(s.writing, seqno)
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
}()
empty := []byte{} // not nil
batch := s.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return xerrors.Errorf("error setting batch: %w", err)
}
}
err := batch.Flush()
if err != nil {
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return nil
}
func (s *BadgerMarkSet) Close() error { func (s *BadgerMarkSet) Close() error {
s.mx.Lock() s.mx.Lock()
defer s.mx.Unlock() defer s.mx.Unlock()
@ -208,72 +248,6 @@ func (s *BadgerMarkSet) Close() error {
func (s *BadgerMarkSet) SetConcurrent() {} func (s *BadgerMarkSet) SetConcurrent() {}
func (v *BadgerMarkSetVisitor) Visit(c cid.Cid) (bool, error) {
if v.pend == nil {
return false, errMarkSetClosed
}
key := c.Hash()
pendKey := string(key)
_, ok := v.pend[pendKey]
if ok {
return false, nil
}
err := v.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
return false, nil
case badger.ErrKeyNotFound:
v.pend[pendKey] = struct{}{}
if len(v.pend) < badgerMarkSetBatchSize {
return true, nil
}
pend := v.pend
v.pend = make(map[string]struct{})
empty := []byte{} // not nil
batch := v.db.NewWriteBatch()
defer batch.Cancel()
for k := range pend {
if err := batch.Set([]byte(k), empty); err != nil {
return false, err
}
}
err := batch.Flush()
if err != nil {
return false, xerrors.Errorf("error flushing batch to badger markset: %w", err)
}
return true, nil
default:
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
func (v *BadgerMarkSetVisitor) Close() error {
if v.pend == nil {
return nil
}
v.pend = nil
db := v.db
v.db = nil
return closeTransientBadgerDB(db, v.path)
}
func openTransientBadgerDB(path string) (*badger.DB, error) { func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first // clean up first
err := os.RemoveAll(path) err := os.RemoveAll(path)

View File

@ -18,27 +18,24 @@ type MapMarkSet struct {
} }
var _ MarkSet = (*MapMarkSet)(nil) var _ MarkSet = (*MapMarkSet)(nil)
var _ MarkSetVisitor = (*MapMarkSet)(nil)
type MapMarkSetVisitor struct {
set map[string]struct{}
}
var _ MarkSetVisitor = (*MapMarkSetVisitor)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil return &MapMarkSetEnv{}, nil
} }
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { func (e *MapMarkSetEnv) create(name string, sizeHint int64) (*MapMarkSet, error) {
return &MapMarkSet{ return &MapMarkSet{
set: make(map[string]struct{}, sizeHint), set: make(map[string]struct{}, sizeHint),
}, nil }, nil
} }
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return e.create(name, sizeHint)
}
func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) { func (e *MapMarkSetEnv) CreateVisitor(name string, sizeHint int64) (MarkSetVisitor, error) {
return &MapMarkSetVisitor{ return e.create(name, sizeHint)
set: make(map[string]struct{}, sizeHint),
}, nil
} }
func (e *MapMarkSetEnv) Close() error { func (e *MapMarkSetEnv) Close() error {
@ -73,6 +70,25 @@ func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
return ok, nil return ok, nil
} }
func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.set == nil {
return false, errMarkSetClosed
}
key := string(c.Hash())
if _, ok := s.set[key]; ok {
return false, nil
}
s.set[key] = struct{}{}
return true, nil
}
func (s *MapMarkSet) Close() error { func (s *MapMarkSet) Close() error {
if s.ts { if s.ts {
s.mx.Lock() s.mx.Lock()
@ -85,22 +101,3 @@ func (s *MapMarkSet) Close() error {
func (s *MapMarkSet) SetConcurrent() { func (s *MapMarkSet) SetConcurrent() {
s.ts = true s.ts = true
} }
func (v *MapMarkSetVisitor) Visit(c cid.Cid) (bool, error) {
if v.set == nil {
return false, errMarkSetClosed
}
key := string(c.Hash())
if _, ok := v.set[key]; ok {
return false, nil
}
v.set[key] = struct{}{}
return true, nil
}
func (v *MapMarkSetVisitor) Close() error {
v.set = nil
return nil
}