improve concurrency properties of Visit with optimistic concurrency

This commit is contained in:
vyzo 2021-07-31 00:13:56 +03:00
parent 563fa1e31a
commit 57c984cea1

View File

@ -27,6 +27,7 @@ type BadgerMarkSet struct {
writing map[int]map[string]struct{} writing map[int]map[string]struct{}
writers int writers int
seqno int seqno int
version int
db *badger.DB db *badger.DB
path string path string
@ -136,40 +137,81 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
} }
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return false, errMarkSetClosed
}
key := c.Hash() key := c.Hash()
pendKey := string(key) pendKey := string(key)
if _, ok := s.pend[pendKey]; ok {
s.mx.Unlock()
return false, nil
}
for _, wr := range s.writing { checkPending := func() (bool, error) {
if _, ok := wr[pendKey]; ok { if s.pend == nil {
s.mx.Unlock() return false, errMarkSetClosed
}
if _, ok := s.pend[pendKey]; ok {
return false, nil return false, nil
} }
for _, wr := range s.writing {
if _, ok := wr[pendKey]; ok {
return false, nil
}
}
return true, nil
} }
err := s.db.View(func(txn *badger.Txn) error { s.mx.RLock()
visit, err := checkPending()
if !visit {
s.mx.RUnlock()
return visit, err
}
err = s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key) _, err := txn.Get(key)
return err return err
}) })
switch err { switch err {
case nil: case nil:
s.mx.Unlock() s.mx.RUnlock()
return false, nil return false, nil
case badger.ErrKeyNotFound: case badger.ErrKeyNotFound:
s.pend[pendKey] = struct{}{} // need to upgrade the lock to exclusive in order to write; take the version count to see
// if there was another write while we were upgrading
version := s.version
s.mx.RUnlock()
s.mx.Lock()
// we have to do the check dance again
visit, err := checkPending()
if !visit {
s.mx.Unlock()
return visit, err
}
if version != s.version {
// something was written to the db, we need to check it
err = s.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
})
switch err {
case nil:
s.mx.Unlock()
return false, nil
case badger.ErrKeyNotFound:
default:
s.mx.Unlock()
return false, xerrors.Errorf("error checking badger markset: %w", err)
}
}
s.pend[pendKey] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize { if len(s.pend) < badgerMarkSetBatchSize {
s.mx.Unlock() s.mx.Unlock()
return true, nil return true, nil
@ -182,7 +224,7 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
return true, nil return true, nil
default: default:
s.mx.Unlock() s.mx.RUnlock()
return false, xerrors.Errorf("error checking badger markset: %w", err) return false, xerrors.Errorf("error checking badger markset: %w", err)
} }
} }
@ -202,6 +244,7 @@ func (s *BadgerMarkSet) putPending() error {
defer s.mx.Unlock() defer s.mx.Unlock()
delete(s.writing, seqno) delete(s.writing, seqno)
s.version++
s.writers-- s.writers--
if s.writers == 0 { if s.writers == 0 {
s.cond.Broadcast() s.cond.Broadcast()