make markSets synchronous in critical section
This commit is contained in:
parent
cf09dd044a
commit
322b85898f
@ -117,10 +117,18 @@ func (s *BadgerMarkSet) BeginCriticalSection() error {
|
|||||||
s.mx.Unlock()
|
s.mx.Unlock()
|
||||||
|
|
||||||
if write {
|
if write {
|
||||||
|
// all writes sync once perist is true
|
||||||
return s.write(seqno)
|
return s.write(seqno)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
// wait for any pending writes and sync
|
||||||
|
s.mx.Lock()
|
||||||
|
for s.writers > 0 {
|
||||||
|
s.cond.Wait()
|
||||||
|
}
|
||||||
|
s.mx.Unlock()
|
||||||
|
|
||||||
|
return s.db.Sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BadgerMarkSet) EndCriticalSection() {
|
func (s *BadgerMarkSet) EndCriticalSection() {
|
||||||
@ -341,6 +349,14 @@ func (s *BadgerMarkSet) write(seqno int) (err error) {
|
|||||||
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
|
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.mx.RLock()
|
||||||
|
persist := s.persist
|
||||||
|
s.mx.RUnlock()
|
||||||
|
|
||||||
|
if persist {
|
||||||
|
return s.db.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +163,13 @@ func (s *MapMarkSet) Mark(c cid.Cid) error {
|
|||||||
s.set[string(hash)] = struct{}{}
|
s.set[string(hash)] = struct{}{}
|
||||||
|
|
||||||
if s.persist {
|
if s.persist {
|
||||||
return s.writeKey(hash, true)
|
if err := s.writeKey(hash, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.file.Sync(); err != nil {
|
||||||
|
return xerrors.Errorf("error syncing markset: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -189,7 +195,13 @@ func (s *MapMarkSet) MarkMany(batch []cid.Cid) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if s.persist {
|
if s.persist {
|
||||||
return s.buf.Flush()
|
if err := s.buf.Flush(); err != nil {
|
||||||
|
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.file.Sync(); err != nil {
|
||||||
|
return xerrors.Errorf("error syncing markset: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -227,6 +239,9 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
|
|||||||
if err := s.writeKey(hash, true); err != nil {
|
if err := s.writeKey(hash, true); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if err := s.file.Sync(); err != nil {
|
||||||
|
return false, xerrors.Errorf("error syncing markset: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user