diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 038d762af..c9727e4d0 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -798,11 +798,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer s.debug.Flush() // 0. Prepare the transaction - s.txnLk.Lock() - s.txnRefs = make(map[cid.Cid]struct{}) - s.txnActive = true - s.txnLookbackEpoch = lookbackEpoch - s.txnLk.Unlock() + s.prepareTxnProtect(lookbackEpoch) // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch log.Info("marking reachable objects") @@ -827,22 +823,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", count) - // fetch references taken during marking and create the transaction protect filter - s.txnLk.Lock() - txnRefs := s.txnRefs - s.txnRefs = nil - s.txnMissing = make(map[cid.Cid]struct{}) - s.txnProtect = markSet - s.txnLk.Unlock() - - defer func() { - s.txnLk.Lock() - _ = s.txnProtect.Close() - s.txnActive = false - s.txnProtect = nil - s.txnMissing = nil - s.txnLk.Unlock() - }() + // begin transactional protection and fetch references created while marking + txnRefs := s.beginTxnProtect(markSet) + defer s.endTxnProtect() // 1.1 Update markset for references created during marking if len(txnRefs) > 0 { @@ -958,7 +941,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) // now that we have collected cold objects, check for missing references from transactional i/o - // and disable further collection of such references (they will not be acted upon) + // and disable further collection of such references (they will not be acted upon as we can't + // possibly delete objects we didn't have when we were collecting cold objects) s.waitForMissingRefs() // 3. copy the cold objects to the coldstore -- if we have one @@ -1003,6 +987,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) // we are done; do some housekeeping + s.endTxnProtect() s.gcHotstore() err = s.setBaseEpoch(boundaryEpoch) @@ -1018,6 +1003,37 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } +func (s *SplitStore) prepareTxnProtect(lookbackEpoch abi.ChainEpoch) { + s.txnLk.Lock() + defer s.txnLk.Unlock() + + s.txnRefs = make(map[cid.Cid]struct{}) + s.txnActive = true + s.txnLookbackEpoch = lookbackEpoch +} + +func (s *SplitStore) beginTxnProtect(markSet MarkSet) map[cid.Cid]struct{} { + s.txnLk.Lock() + defer s.txnLk.Unlock() + + txnRefs := s.txnRefs + s.txnRefs = nil + s.txnMissing = make(map[cid.Cid]struct{}) + s.txnProtect = markSet + + return txnRefs +} + +func (s *SplitStore) endTxnProtect() { + s.txnLk.Lock() + defer s.txnLk.Unlock() + + _ = s.txnProtect.Close() + s.txnActive = false + s.txnProtect = nil + s.txnMissing = nil +} + func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, f func(cid.Cid) error) error { visited := cid.NewSet()