From 0e2af11f6a38ec1268a5009cc9895003726b9a02 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 7 Jul 2021 01:39:58 +0300 Subject: [PATCH] prepare the transaction before launching the compaction goroutine --- blockstore/splitstore/splitstore.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index f13107a16..fa38621d3 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -486,9 +486,11 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { } if epoch-s.baseEpoch > CompactionThreshold { - // it's time to compact + // it's time to compact -- prepare the transaction and go! + s.prepareTxnProtect(curTs) go func() { defer atomic.StoreInt32(&s.compacting, 0) + defer s.endTxnProtect() log.Info("compacting splitstore") start := time.Now() @@ -755,9 +757,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer markSet.Close() //nolint:errcheck defer s.debug.Flush() - // 0. Prepare the transaction - s.prepareTxnProtect(lookbackEpoch) - // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") @@ -782,9 +781,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", count) - // begin transactional protection and fetch references created while marking + // begin transactional protection with concurrent marking and fetch references created while marking txnRefs := s.beginTxnProtect(markSet) - defer s.endTxnProtect() // 1.1 Update markset for references created during marking if len(txnRefs) > 0 { @@ -940,7 +938,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // 5. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() - err = s.purge(curTs, cold) + err = s.purge(cold) if err != nil { return xerrors.Errorf("error purging cold blocks: %w", err) } @@ -963,7 +961,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } -func (s *SplitStore) prepareTxnProtect(lookbackEpoch abi.ChainEpoch) { +func (s *SplitStore) prepareTxnProtect(curTs *types.TipSet) { + lookbackEpoch := curTs.Height() - CompactionLookback + log.Info("preparing compaction transaction") + s.txnLk.Lock() defer s.txnLk.Unlock() @@ -1340,7 +1341,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro return nil } -func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error { +func (s *SplitStore) purge(cids []cid.Cid) error { deadCids := make([]cid.Cid, 0, batchSize) var purgeCnt, liveCnt int defer func() {