make badger markset concurrent close safe

This commit is contained in:
vyzo 2021-07-22 13:54:50 +03:00
parent f2b7c3e6f2
commit 2891a31c99

View File

@ -22,7 +22,9 @@ var _ MarkSetEnv = (*BadgerMarkSetEnv)(nil)
type BadgerMarkSet struct { type BadgerMarkSet struct {
mx sync.RWMutex mx sync.RWMutex
cond sync.Cond
pend map[string]struct{} pend map[string]struct{}
writers int
db *badger.DB db *badger.DB
path string path string
@ -69,11 +71,14 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return nil, xerrors.Errorf("error creating badger markset: %w", err) return nil, xerrors.Errorf("error creating badger markset: %w", err)
} }
return &BadgerMarkSet{ ms := &BadgerMarkSet{
pend: make(map[string]struct{}), pend: make(map[string]struct{}),
db: db, db: db,
path: path, path: path,
}, nil }
ms.cond.L = &ms.mx
return ms, nil
} }
func (e *BadgerMarkSetEnv) Close() error { func (e *BadgerMarkSetEnv) Close() error {
@ -97,12 +102,21 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error {
pend := s.pend pend := s.pend
s.pend = make(map[string]struct{}) s.pend = make(map[string]struct{})
db := s.db s.writers++
s.mx.Unlock() s.mx.Unlock()
defer func() {
s.mx.Lock()
s.writers--
if s.writers == 0 {
s.cond.Broadcast()
}
s.mx.Unlock()
}()
empty := []byte{} // not nil empty := []byte{} // not nil
batch := db.NewWriteBatch() batch := s.db.NewWriteBatch()
defer batch.Cancel() defer batch.Cancel()
for k := range pend { for k := range pend {
@ -158,6 +172,10 @@ func (s *BadgerMarkSet) Close() error {
return nil return nil
} }
for s.writers > 0 {
s.cond.Wait()
}
s.pend = nil s.pend = nil
db := s.db db := s.db
s.db = nil s.db = nil