From 257423e917ffb929907f2309df77c2ed3f62f32a Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 13 Jul 2021 09:01:50 +0300 Subject: [PATCH] fix view waiting issues with the WaitGroup We can add after Wait is called, which is problematic with WaitGroups. This instead uses a mx/cond combo and waits while the count is > 0. The only downside is that we might needlessly wait for (a bunch) of views that started while the txn is active, but we can live with that. --- blockstore/splitstore/splitstore.go | 61 +++++++++++++++++++---------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 3adc02fa9..563b14bc4 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -146,13 +146,15 @@ type SplitStore struct { debug *debugLog // transactional protection for concurrent read/writes during compaction - txnLk sync.RWMutex - txnActive bool - txnViews sync.WaitGroup - txnProtect MarkSet - txnRefsMx sync.Mutex - txnRefs map[cid.Cid]struct{} - txnMissing map[cid.Cid]struct{} + txnLk sync.RWMutex + txnViewsMx sync.Mutex + txnViewsCond sync.Cond + txnViews int + txnActive bool + txnProtect MarkSet + txnRefsMx sync.Mutex + txnRefs map[cid.Cid]struct{} + txnMissing map[cid.Cid]struct{} } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -199,6 +201,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co coldPurgeSize: defaultColdPurgeSize, } + ss.txnViewsCond.L = &ss.txnViewsMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) if enableDebugLog { @@ -444,10 +447,8 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { // view can't have its data pointer deleted, which would be catastrophic. // Note that we can't just RLock for the duration of the view, as this could // lead to deadlock with recursive views. - wg := s.protectView(cid) - if wg != nil { - defer wg.Done() - } + s.protectView(cid) + defer s.viewDone() err := s.hot.View(cid, cb) switch err { @@ -594,7 +595,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { if epoch-s.baseEpoch > CompactionThreshold { // it's time to compact -- prepare the transaction and go! - wg := s.beginTxnProtect() + s.beginTxnProtect() go func() { defer atomic.StoreInt32(&s.compacting, 0) defer s.endTxnProtect() @@ -602,7 +603,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { log.Info("compacting splitstore") start := time.Now() - s.compact(curTs, wg) + s.compact(curTs) log.Infow("compaction done", "took", time.Since(start)) }() @@ -632,16 +633,36 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) { } // transactionally protect a view -func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup { +func (s *SplitStore) protectView(c cid.Cid) { s.txnLk.RLock() defer s.txnLk.RUnlock() - s.txnViews.Add(1) if s.txnActive { s.trackTxnRef(c) } - return &s.txnViews + s.txnViewsMx.Lock() + s.txnViews++ + s.txnViewsMx.Unlock() +} + +func (s *SplitStore) viewDone() { + s.txnViewsMx.Lock() + defer s.txnViewsMx.Unlock() + + s.txnViews-- + if s.txnViews == 0 { + s.txnViewsCond.Signal() + } +} + +func (s *SplitStore) viewWait() { + s.txnViewsMx.Lock() + defer s.txnViewsMx.Unlock() + + for s.txnViews > 0 { + s.txnViewsCond.Wait() + } } // transactionally protect a reference to an object @@ -933,10 +954,10 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { // - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references) // - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live // - We then end the transaction and compact/gc the hotstore. -func (s *SplitStore) compact(curTs *types.TipSet, wg *sync.WaitGroup) { +func (s *SplitStore) compact(curTs *types.TipSet) { log.Info("waiting for active views to complete") start := time.Now() - wg.Wait() + s.viewWait() log.Infow("waiting for active views done", "took", time.Since(start)) start = time.Now() @@ -1126,7 +1147,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } -func (s *SplitStore) beginTxnProtect() *sync.WaitGroup { +func (s *SplitStore) beginTxnProtect() { log.Info("preparing compaction transaction") s.txnLk.Lock() @@ -1135,8 +1156,6 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup { s.txnActive = true s.txnRefs = make(map[cid.Cid]struct{}) s.txnMissing = make(map[cid.Cid]struct{}) - - return &s.txnViews } func (s *SplitStore) beginTxnMarking(markSet MarkSet) {