From 322b85898f7ac07f6f81ae9b6dff52aee6bff3d8 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 30 Jan 2022 12:10:08 +0200 Subject: [PATCH] make markSets synchronous in critical section --- blockstore/splitstore/markset_badger.go | 18 +++++++++++++++++- blockstore/splitstore/markset_map.go | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/blockstore/splitstore/markset_badger.go b/blockstore/splitstore/markset_badger.go index 95ee60e9b..5b7eb471a 100644 --- a/blockstore/splitstore/markset_badger.go +++ b/blockstore/splitstore/markset_badger.go @@ -117,10 +117,18 @@ func (s *BadgerMarkSet) BeginCriticalSection() error { s.mx.Unlock() if write { + // all writes sync once perist is true return s.write(seqno) } - return nil + // wait for any pending writes and sync + s.mx.Lock() + for s.writers > 0 { + s.cond.Wait() + } + s.mx.Unlock() + + return s.db.Sync() } func (s *BadgerMarkSet) EndCriticalSection() { @@ -341,6 +349,14 @@ func (s *BadgerMarkSet) write(seqno int) (err error) { return xerrors.Errorf("error flushing batch to badger markset: %w", err) } + s.mx.RLock() + persist := s.persist + s.mx.RUnlock() + + if persist { + return s.db.Sync() + } + return nil } diff --git a/blockstore/splitstore/markset_map.go b/blockstore/splitstore/markset_map.go index b046b5134..8b088e70a 100644 --- a/blockstore/splitstore/markset_map.go +++ b/blockstore/splitstore/markset_map.go @@ -163,7 +163,13 @@ func (s *MapMarkSet) Mark(c cid.Cid) error { s.set[string(hash)] = struct{}{} if s.persist { - return s.writeKey(hash, true) + if err := s.writeKey(hash, true); err != nil { + return err + } + + if err := s.file.Sync(); err != nil { + return xerrors.Errorf("error syncing markset: %w", err) + } } return nil @@ -189,7 +195,13 @@ func (s *MapMarkSet) MarkMany(batch []cid.Cid) error { } if s.persist { - return s.buf.Flush() + if err := s.buf.Flush(); err != nil { + return xerrors.Errorf("error flushing markset buffer to disk: %w", err) + } + + if err := s.file.Sync(); err != nil { + return xerrors.Errorf("error syncing markset: %w", err) + } } return nil @@ -227,6 +239,9 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) { if err := s.writeKey(hash, true); err != nil { return false, err } + if err := s.file.Sync(); err != nil { + return false, xerrors.Errorf("error syncing markset: %w", err) + } } return true, nil