From b5da33c0134f80e78e32e231640f871c258ef942 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Sat, 4 Mar 2023 07:51:15 -0700 Subject: [PATCH] signal chain in and out of sync to compaction workers --- blockstore/splitstore/splitstore.go | 11 +++ blockstore/splitstore/splitstore_compact.go | 75 +++++++++++++++++---- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index bd9efb630..7535ec197 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -187,6 +187,11 @@ type SplitStore struct { ctx context.Context cancel func() + outOfSync int32 // for fast checking + chainSyncMx sync.RWMutex + chainSyncCond sync.Cond + chainSyncFinished bool // protected by chainSyncMx + debug *debugLog // transactional protection for concurrent read/writes during compaction @@ -261,6 +266,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnViewsCond.L = &ss.txnViewsMx ss.txnSyncCond.L = &ss.txnSyncMx + ss.chainSyncCond.L = &ss.chainSyncMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) ss.reifyCond.L = &ss.reifyMx @@ -822,6 +828,11 @@ func (s *SplitStore) Close() error { s.txnSyncCond.Broadcast() s.txnSyncMx.Unlock() + s.chainSyncMx.Lock() + s.chainSyncFinished = true + s.chainSyncCond.Broadcast() + s.chainSyncMx.Unlock() + log.Warn("close with ongoing compaction in progress; waiting for it to finish...") for atomic.LoadInt32(&s.compacting) == 1 { time.Sleep(time.Second) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index e5cfec0e4..41dd128e9 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -91,7 +91,35 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { // Regardless, we put a mutex in HeadChange just to be safe if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { - // we are currently compacting -- protect the new tipset(s) + // we are currently compacting + // 1. Signal sync condition to yield compaction when out of sync and resume when in sync + timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) + if CheckSyncGap && time.Since(timestamp) > SyncGapTime { + /* Chain out of sync */ + if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 1) { + // transition from in sync to out of sync + s.chainSyncMx.Lock() + s.chainSyncFinished = false + s.chainSyncMx.Unlock() + } + // already out of sync, no signaling necessary + + } + // TODO: ok to use hysteresis with no transitions between 30s and 1m? + if time.Since(timestamp) < SyncWaitTime { + /* Chain in sync */ + if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 0) { + // already in sync, no signaling necessary + } else { + // transition from out of sync to in sync + s.chainSyncMx.Lock() + s.chainSyncFinished = true + s.chainSyncCond.Broadcast() + s.chainSyncMx.Unlock() + } + + } + // 2. protect the new tipset(s) s.protectTipSets(apply) return nil } @@ -427,7 +455,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) { - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return 0, err } @@ -545,7 +573,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } defer coldSet.Close() //nolint:errcheck - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -617,7 +645,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", *count) - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -627,7 +655,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error protecting transactional refs: %w", err) } - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -704,7 +732,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt)) stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt)) - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -713,7 +741,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // possibly delete objects we didn't have when we were collecting cold objects) s.waitForMissingRefs(markSet) - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -733,7 +761,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("moving done", "took", time.Since(startMove)) - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -764,7 +792,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } // wait for the head to catch up so that the current tipset is marked - s.waitForSync() + s.waitForTxnSync() if err := s.checkClosing(); err != nil { return err @@ -865,7 +893,7 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error { return nil } -func (s *SplitStore) waitForSync() { +func (s *SplitStore) waitForTxnSync() { log.Info("waiting for sync") if !CheckSyncGap { log.Warnf("If you see this outside of test it is a serious splitstore issue") @@ -884,6 +912,25 @@ func (s *SplitStore) waitForSync() { } } +// Block compaction operations if chain sync has fallen behind +func (s *SplitStore) waitForSync() { + if atomic.LoadInt32(&s.outOfSync) == 0 { + return + } + s.chainSyncMx.RLock() + defer s.chainSyncMx.RUnlock() + + for !s.chainSyncFinished { + s.chainSyncCond.Wait() + } +} + +// Combined sync and closing check +func (s *SplitStore) checkYield() error { + s.waitForSync() + return s.checkClosing() +} + func (s *SplitStore) endTxnProtect() { s.txnLk.Lock() defer s.txnLk.Unlock() @@ -1037,7 +1084,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp for len(toWalk) > 0 { // walking can take a while, so check this with every opportunity - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } @@ -1106,7 +1153,7 @@ func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid } // check this before recursing - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return 0, err } @@ -1175,7 +1222,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m } // check this before recursing - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return sz, err } @@ -1262,7 +1309,7 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error { batch := make([]blocks.Block, 0, batchSize) err := coldr.ForEach(func(c cid.Cid) error { - if err := s.checkClosing(); err != nil { + if err := s.checkYield(); err != nil { return err } blk, err := s.hot.Get(s.ctx, c)