diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index da5596785..ab85a2b41 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -27,6 +27,7 @@ type BadgerMarkSet struct { writing map[int]map[string]struct{} writers int seqno int + version int db *badger.DB path string @@ -136,40 +137,81 @@ func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { } func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { - s.mx.Lock() - - if s.pend == nil { - s.mx.Unlock() - return false, errMarkSetClosed - } - key := c.Hash() pendKey := string(key) - if _, ok := s.pend[pendKey]; ok { - s.mx.Unlock() - return false, nil - } - for _, wr := range s.writing { - if _, ok := wr[pendKey]; ok { - s.mx.Unlock() + checkPending := func() (bool, error) { + if s.pend == nil { + return false, errMarkSetClosed + } + + if _, ok := s.pend[pendKey]; ok { return false, nil } + + for _, wr := range s.writing { + if _, ok := wr[pendKey]; ok { + return false, nil + } + } + + return true, nil } - err := s.db.View(func(txn *badger.Txn) error { + s.mx.RLock() + + visit, err := checkPending() + if !visit { + s.mx.RUnlock() + return visit, err + } + + err = s.db.View(func(txn *badger.Txn) error { _, err := txn.Get(key) return err }) switch err { case nil: - s.mx.Unlock() + s.mx.RUnlock() return false, nil case badger.ErrKeyNotFound: - s.pend[pendKey] = struct{}{} + // need to upgrade the lock to exclusive in order to write; take the version count to see + // if there was another write while we were upgrading + version := s.version + s.mx.RUnlock() + s.mx.Lock() + + // we have to do the check dance again + visit, err := checkPending() + if !visit { + s.mx.Unlock() + return visit, err + } + + if version != s.version { + // something was written to the db, we need to check it + err = s.db.View(func(txn *badger.Txn) error { + _, err := txn.Get(key) + return err + }) + + switch err { + case nil: + s.mx.Unlock() + return false, nil + + case badger.ErrKeyNotFound: + + default: + s.mx.Unlock() + return false, xerrors.Errorf("error checking badger markset: %w", err) + } + } + + s.pend[pendKey] = struct{}{} if len(s.pend) < badgerMarkSetBatchSize { s.mx.Unlock() return true, nil @@ -182,7 +224,7 @@ func (s *BadgerMarkSet) Visit(c cid.Cid) (bool, error) { return true, nil default: - s.mx.Unlock() + s.mx.RUnlock() return false, xerrors.Errorf("error checking badger markset: %w", err) } } @@ -202,6 +244,7 @@ func (s *BadgerMarkSet) putPending() error { defer s.mx.Unlock() delete(s.writing, seqno) + s.version++ s.writers-- if s.writers == 0 { s.cond.Broadcast()