diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index 9e76dfb09..1593b150d 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -100,71 +100,91 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { s.mx.RLock() defer s.mx.RUnlock() - if s.pend == nil { - return false, errMarkSetClosed - } - key := c.Hash() pendKey := string(key) - _, ok := s.pend[pendKey] - if ok { - return true, nil + + has, err := s.tryPending(pendKey) + if has || err != nil { + return has, err } - for _, wr := range s.writing { - _, ok := wr[pendKey] - if ok { - return true, nil - } - } - - err := s.db.View(func(txn *badger.Txn) error { - _, err := txn.Get(key) - return err - }) - - switch err { - case nil: - return true, nil - - case badger.ErrKeyNotFound: - return false, nil - - default: - return false, xerrors.Errorf("error checking badger markset: %w", err) - } + return s.tryDB(key) } func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { key := c.Hash() pendKey := string(key) - checkPending := func() (bool, error) { - if s.pend == nil { - return false, errMarkSetClosed - } + s.mx.RLock() - if _, ok := s.pend[pendKey]; ok { - return false, nil - } + has, err := s.tryPending(pendKey) + if has || err != nil { + s.mx.RUnlock() + return false, err + } - for _, wr := range s.writing { - if _, ok := wr[pendKey]; ok { - return false, nil - } - } + has, err = s.tryDB(key) + if has || err != nil { + s.mx.RUnlock() + return false, err + } + // we need to upgrade the lock to exclusive in order to write; take the version count to see + // if there was another write while we were upgrading + version := s.version + s.mx.RUnlock() + + s.mx.Lock() + + // we have to do the check dance again + has, err = s.tryPending(pendKey) + if has || err != nil { + s.mx.Unlock() + return false, err + } + + if version != s.version { + // something was written to the db, we need to check it + has, err = s.tryDB(key) + if has || err != nil { + s.mx.Unlock() + return false, err + } + } + + s.pend[pendKey] = struct{}{} + if len(s.pend) < badgerMarkSetBatchSize { + s.mx.Unlock() return true, nil } - s.mx.RLock() - - visit, err := checkPending() - if !visit { - s.mx.RUnlock() - return visit, err + if err := s.putPending(); err != nil { + return false, err } + return true, nil +} + +// reader holds the (r)lock +func (s *BadgerMarkSet) tryPending(key string) (has bool, err error) { + if s.pend == nil { + return false, errMarkSetClosed + } + + if _, ok := s.pend[key]; ok { + return true, nil + } + + for _, wr := range s.writing { + if _, ok := wr[key]; ok { + return true, nil + } + } + + return false, nil +} + +func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) { err = s.db.View(func(txn *badger.Txn) error { _, err := txn.Get(key) return err @@ -172,63 +192,17 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { switch err { case nil: - s.mx.RUnlock() - return false, nil - - case badger.ErrKeyNotFound: - // need to upgrade the lock to exclusive in order to write; take the version count to see - // if there was another write while we were upgrading - version := s.version - s.mx.RUnlock() - - s.mx.Lock() - - // we have to do the check dance again - visit, err := checkPending() - if !visit { - s.mx.Unlock() - return visit, err - } - - if version != s.version { - // something was written to the db, we need to check it - err = s.db.View(func(txn *badger.Txn) error { - _, err := txn.Get(key) - return err - }) - - switch err { - case nil: - s.mx.Unlock() - return false, nil - - case badger.ErrKeyNotFound: - - default: - s.mx.Unlock() - return false, xerrors.Errorf("error checking badger markset: %w", err) - } - } - - s.pend[pendKey] = struct{}{} - if len(s.pend) < badgerMarkSetBatchSize { - s.mx.Unlock() - return true, nil - } - - if err := s.putPending(); err != nil { - return false, err - } - return true, nil + case badger.ErrKeyNotFound: + return false, nil + default: - s.mx.RUnlock() - return false, xerrors.Errorf("error checking badger markset: %w", err) + return false, err } } -// writer holds the lock +// writer holds the exclusive lock; putPending releases it func (s *BadgerMarkSet) putPending() error { pend := s.pend seqno := s.seqno