simplify and deduplicate Has/Visit using helper methods tryPending and tryDB.
This commit is contained in:
parent
380e16d465
commit
26a5832f92
@ -100,71 +100,91 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
|
||||
s.mx.RLock()
|
||||
defer s.mx.RUnlock()
|
||||
|
||||
if s.pend == nil {
|
||||
return false, errMarkSetClosed
|
||||
}
|
||||
|
||||
key := c.Hash()
|
||||
pendKey := string(key)
|
||||
_, ok := s.pend[pendKey]
|
||||
if ok {
|
||||
return true, nil
|
||||
|
||||
has, err := s.tryPending(pendKey)
|
||||
if has || err != nil {
|
||||
return has, err
|
||||
}
|
||||
|
||||
for _, wr := range s.writing {
|
||||
_, ok := wr[pendKey]
|
||||
if ok {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(key)
|
||||
return err
|
||||
})
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
return true, nil
|
||||
|
||||
case badger.ErrKeyNotFound:
|
||||
return false, nil
|
||||
|
||||
default:
|
||||
return false, xerrors.Errorf("error checking badger markset: %w", err)
|
||||
}
|
||||
return s.tryDB(key)
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
|
||||
key := c.Hash()
|
||||
pendKey := string(key)
|
||||
|
||||
checkPending := func() (bool, error) {
|
||||
if s.pend == nil {
|
||||
return false, errMarkSetClosed
|
||||
}
|
||||
s.mx.RLock()
|
||||
|
||||
if _, ok := s.pend[pendKey]; ok {
|
||||
return false, nil
|
||||
}
|
||||
has, err := s.tryPending(pendKey)
|
||||
if has || err != nil {
|
||||
s.mx.RUnlock()
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, wr := range s.writing {
|
||||
if _, ok := wr[pendKey]; ok {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
has, err = s.tryDB(key)
|
||||
if has || err != nil {
|
||||
s.mx.RUnlock()
|
||||
return false, err
|
||||
}
|
||||
|
||||
// we need to upgrade the lock to exclusive in order to write; take the version count to see
|
||||
// if there was another write while we were upgrading
|
||||
version := s.version
|
||||
s.mx.RUnlock()
|
||||
|
||||
s.mx.Lock()
|
||||
|
||||
// we have to do the check dance again
|
||||
has, err = s.tryPending(pendKey)
|
||||
if has || err != nil {
|
||||
s.mx.Unlock()
|
||||
return false, err
|
||||
}
|
||||
|
||||
if version != s.version {
|
||||
// something was written to the db, we need to check it
|
||||
has, err = s.tryDB(key)
|
||||
if has || err != nil {
|
||||
s.mx.Unlock()
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
s.pend[pendKey] = struct{}{}
|
||||
if len(s.pend) < badgerMarkSetBatchSize {
|
||||
s.mx.Unlock()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
s.mx.RLock()
|
||||
|
||||
visit, err := checkPending()
|
||||
if !visit {
|
||||
s.mx.RUnlock()
|
||||
return visit, err
|
||||
if err := s.putPending(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// reader holds the (r)lock
|
||||
func (s *BadgerMarkSet) tryPending(key string) (has bool, err error) {
|
||||
if s.pend == nil {
|
||||
return false, errMarkSetClosed
|
||||
}
|
||||
|
||||
if _, ok := s.pend[key]; ok {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
for _, wr := range s.writing {
|
||||
if _, ok := wr[key]; ok {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
|
||||
err = s.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(key)
|
||||
return err
|
||||
@ -172,63 +192,17 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) {
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
s.mx.RUnlock()
|
||||
return false, nil
|
||||
|
||||
case badger.ErrKeyNotFound:
|
||||
// need to upgrade the lock to exclusive in order to write; take the version count to see
|
||||
// if there was another write while we were upgrading
|
||||
version := s.version
|
||||
s.mx.RUnlock()
|
||||
|
||||
s.mx.Lock()
|
||||
|
||||
// we have to do the check dance again
|
||||
visit, err := checkPending()
|
||||
if !visit {
|
||||
s.mx.Unlock()
|
||||
return visit, err
|
||||
}
|
||||
|
||||
if version != s.version {
|
||||
// something was written to the db, we need to check it
|
||||
err = s.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(key)
|
||||
return err
|
||||
})
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
s.mx.Unlock()
|
||||
return false, nil
|
||||
|
||||
case badger.ErrKeyNotFound:
|
||||
|
||||
default:
|
||||
s.mx.Unlock()
|
||||
return false, xerrors.Errorf("error checking badger markset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
s.pend[pendKey] = struct{}{}
|
||||
if len(s.pend) < badgerMarkSetBatchSize {
|
||||
s.mx.Unlock()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err := s.putPending(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
||||
case badger.ErrKeyNotFound:
|
||||
return false, nil
|
||||
|
||||
default:
|
||||
s.mx.RUnlock()
|
||||
return false, xerrors.Errorf("error checking badger markset: %w", err)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// writer holds the lock
|
||||
// writer holds the exclusive lock; putPending releases it
|
||||
func (s *BadgerMarkSet) putPending() error {
|
||||
pend := s.pend
|
||||
seqno := s.seqno
|
||||
|
Loading…
Reference in New Issue
Block a user