check the closing state variable often

so that we have a reasonably quick graceful shutdown
This commit is contained in:
vyzo 2021-07-08 10:13:44 +03:00
parent 4f808367f8
commit f5c45bd517

View File

@ -807,6 +807,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer markSet.Close() //nolint:errcheck
defer s.debug.Flush()
if err := s.checkClosing(); err != nil {
return err
}
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
@ -815,6 +819,11 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
var count int64
err = s.walkChain(curTs, boundaryEpoch, true,
func(c cid.Cid) error {
// marking takes a while, so check this with every opportunity
if err := s.checkClosing(); err != nil {
return err
}
if isFilCommitment(c) {
return errStopWalk
}
@ -831,6 +840,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
if err := s.checkClosing(); err != nil {
return err
}
// begin transactional protection with concurrent marking and fetch references created while marking
txnRefs := s.beginTxnConcurrentMarking(markSet)
@ -842,6 +855,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
count = 0
for c := range txnRefs {
if err := s.checkClosing(); err != nil {
return err
}
if isFilCommitment(c) {
continue
}
@ -906,6 +923,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count)
}
if err := s.checkClosing(); err != nil {
return err
}
// 2. iterate through the hotstore to collect cold objects
log.Info("collecting cold objects")
startCollect := time.Now()
@ -947,11 +968,19 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
if err := s.checkClosing(); err != nil {
return err
}
// now that we have collected cold objects, check for missing references from transactional i/o
// and disable further collection of such references (they will not be acted upon as we can't
// possibly delete objects we didn't have when we were collecting cold objects)
s.waitForMissingRefs()
if err := s.checkClosing(); err != nil {
return err
}
// 3. copy the cold objects to the coldstore -- if we have one
if !s.cfg.DiscardColdBlocks {
log.Info("moving cold objects to the coldstore")
@ -980,9 +1009,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer atomic.StoreInt32(&s.critsection, 0)
// check to see if we are closing first; if that's the case just return
if atomic.LoadInt32(&s.closing) == 1 {
log.Info("splitstore is closing; aborting compaction")
return xerrors.Errorf("compaction aborted")
if err := s.checkClosing(); err != nil {
return err
}
// 5. purge cold objects from the hotstore, taking protected references into account
@ -1232,6 +1260,15 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) {
return s.cold.Has(c)
}
func (s *SplitStore) checkClosing() error {
if atomic.LoadInt32(&s.closing) == 1 {
log.Info("splitstore is closing; aborting compaction")
return xerrors.Errorf("compaction aborted")
}
return nil
}
func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlock bool, err error) {
if c.Prefix().Codec != cid.DagCBOR {
return false, nil
@ -1252,6 +1289,10 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
batch := make([]blocks.Block, 0, batchSize)
for _, c := range cold {
if err := s.checkClosing(); err != nil {
return err
}
blk, err := s.hot.Get(c)
if err != nil {
if err == bstore.ErrNotFound {
@ -1300,6 +1341,11 @@ func (s *SplitStore) sortObjects(cids []cid.Cid) error {
// compute sorting weights as the cumulative number of DAG links
weights := make(map[string]int)
for _, c := range cids {
// this can take quite a while, so check for shutdown with every opportunity
if err := s.checkClosing(); err != nil {
return err
}
w := s.getObjectWeight(c, weights, key)
weights[key(c)] = w
}
@ -1402,9 +1448,8 @@ func (s *SplitStore) purge(cids []cid.Cid) error {
func(cids []cid.Cid) error {
deadCids := deadCids[:0]
if atomic.LoadInt32(&s.closing) == 1 {
log.Info("splitstore is closing; aborting purge")
return xerrors.Errorf("compaction aborted")
if err := s.checkClosing(); err != nil {
return err
}
s.txnLk.Lock()
@ -1461,6 +1506,10 @@ func (s *SplitStore) waitForMissingRefs() {
}()
for i := 0; i < 3 && len(missing) > 0; i++ {
if err := s.checkClosing(); err != nil {
return
}
wait := time.Duration(i) * time.Minute
log.Infof("retrying for %d missing references in %s (attempt: %d)", len(missing), wait, i+1)
if wait > 0 {