diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index d72ec665d..e1e7463d4 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -807,6 +807,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer markSet.Close() //nolint:errcheck defer s.debug.Flush() + if err := s.checkClosing(); err != nil { + return err + } + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") @@ -815,6 +819,11 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { var count int64 err = s.walkChain(curTs, boundaryEpoch, true, func(c cid.Cid) error { + // marking takes a while, so check this with every opportunity + if err := s.checkClosing(); err != nil { + return err + } + if isFilCommitment(c) { return errStopWalk } @@ -831,6 +840,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", count) + if err := s.checkClosing(); err != nil { + return err + } + // begin transactional protection with concurrent marking and fetch references created while marking txnRefs := s.beginTxnConcurrentMarking(markSet) @@ -842,6 +855,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { count = 0 for c := range txnRefs { + if err := s.checkClosing(); err != nil { + return err + } + if isFilCommitment(c) { continue } @@ -906,6 +923,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("update mark set done", "took", time.Since(startMark), "marked", count) } + if err := s.checkClosing(); err != nil { + return err + } + // 2. iterate through the hotstore to collect cold objects log.Info("collecting cold objects") startCollect := time.Now() @@ -947,11 +968,19 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + if err := s.checkClosing(); err != nil { + return err + } + // now that we have collected cold objects, check for missing references from transactional i/o // and disable further collection of such references (they will not be acted upon as we can't // possibly delete objects we didn't have when we were collecting cold objects) s.waitForMissingRefs() + if err := s.checkClosing(); err != nil { + return err + } + // 3. copy the cold objects to the coldstore -- if we have one if !s.cfg.DiscardColdBlocks { log.Info("moving cold objects to the coldstore") @@ -980,9 +1009,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer atomic.StoreInt32(&s.critsection, 0) // check to see if we are closing first; if that's the case just return - if atomic.LoadInt32(&s.closing) == 1 { - log.Info("splitstore is closing; aborting compaction") - return xerrors.Errorf("compaction aborted") + if err := s.checkClosing(); err != nil { + return err } // 5. purge cold objects from the hotstore, taking protected references into account @@ -1232,6 +1260,15 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) { return s.cold.Has(c) } +func (s *SplitStore) checkClosing() error { + if atomic.LoadInt32(&s.closing) == 1 { + log.Info("splitstore is closing; aborting compaction") + return xerrors.Errorf("compaction aborted") + } + + return nil +} + func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlock bool, err error) { if c.Prefix().Codec != cid.DagCBOR { return false, nil @@ -1252,6 +1289,10 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { batch := make([]blocks.Block, 0, batchSize) for _, c := range cold { + if err := s.checkClosing(); err != nil { + return err + } + blk, err := s.hot.Get(c) if err != nil { if err == bstore.ErrNotFound { @@ -1300,6 +1341,11 @@ func (s *SplitStore) sortObjects(cids []cid.Cid) error { // compute sorting weights as the cumulative number of DAG links weights := make(map[string]int) for _, c := range cids { + // this can take quite a while, so check for shutdown with every opportunity + if err := s.checkClosing(); err != nil { + return err + } + w := s.getObjectWeight(c, weights, key) weights[key(c)] = w } @@ -1402,9 +1448,8 @@ func (s *SplitStore) purge(cids []cid.Cid) error { func(cids []cid.Cid) error { deadCids := deadCids[:0] - if atomic.LoadInt32(&s.closing) == 1 { - log.Info("splitstore is closing; aborting purge") - return xerrors.Errorf("compaction aborted") + if err := s.checkClosing(); err != nil { + return err } s.txnLk.Lock() @@ -1461,6 +1506,10 @@ func (s *SplitStore) waitForMissingRefs() { }() for i := 0; i < 3 && len(missing) > 0; i++ { + if err := s.checkClosing(); err != nil { + return + } + wait := time.Duration(i) * time.Minute log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i+1) if wait > 0 {