prepare the transaction before launching the compaction goroutine
This commit is contained in:
parent
f2f4af669d
commit
0e2af11f6a
@ -486,9 +486,11 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if epoch-s.baseEpoch > CompactionThreshold {
|
if epoch-s.baseEpoch > CompactionThreshold {
|
||||||
// it's time to compact
|
// it's time to compact -- prepare the transaction and go!
|
||||||
|
s.prepareTxnProtect(curTs)
|
||||||
go func() {
|
go func() {
|
||||||
defer atomic.StoreInt32(&s.compacting, 0)
|
defer atomic.StoreInt32(&s.compacting, 0)
|
||||||
|
defer s.endTxnProtect()
|
||||||
|
|
||||||
log.Info("compacting splitstore")
|
log.Info("compacting splitstore")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@ -755,9 +757,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
defer markSet.Close() //nolint:errcheck
|
defer markSet.Close() //nolint:errcheck
|
||||||
defer s.debug.Flush()
|
defer s.debug.Flush()
|
||||||
|
|
||||||
// 0. Prepare the transaction
|
|
||||||
s.prepareTxnProtect(lookbackEpoch)
|
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
|
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
|
||||||
// and messages until the boundary epoch.
|
// and messages until the boundary epoch.
|
||||||
log.Info("marking reachable objects")
|
log.Info("marking reachable objects")
|
||||||
@ -782,9 +781,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
||||||
|
|
||||||
// begin transactional protection and fetch references created while marking
|
// begin transactional protection with concurrent marking and fetch references created while marking
|
||||||
txnRefs := s.beginTxnProtect(markSet)
|
txnRefs := s.beginTxnProtect(markSet)
|
||||||
defer s.endTxnProtect()
|
|
||||||
|
|
||||||
// 1.1 Update markset for references created during marking
|
// 1.1 Update markset for references created during marking
|
||||||
if len(txnRefs) > 0 {
|
if len(txnRefs) > 0 {
|
||||||
@ -940,7 +938,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
// 5. purge cold objects from the hotstore, taking protected references into account
|
// 5. purge cold objects from the hotstore, taking protected references into account
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
err = s.purge(curTs, cold)
|
err = s.purge(cold)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error purging cold blocks: %w", err)
|
return xerrors.Errorf("error purging cold blocks: %w", err)
|
||||||
}
|
}
|
||||||
@ -963,7 +961,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) prepareTxnProtect(lookbackEpoch abi.ChainEpoch) {
|
func (s *SplitStore) prepareTxnProtect(curTs *types.TipSet) {
|
||||||
|
lookbackEpoch := curTs.Height() - CompactionLookback
|
||||||
|
log.Info("preparing compaction transaction")
|
||||||
|
|
||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
@ -1340,7 +1341,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
func (s *SplitStore) purge(cids []cid.Cid) error {
|
||||||
deadCids := make([]cid.Cid, 0, batchSize)
|
deadCids := make([]cid.Cid, 0, batchSize)
|
||||||
var purgeCnt, liveCnt int
|
var purgeCnt, liveCnt int
|
||||||
defer func() {
|
defer func() {
|
||||||
|
Loading…
Reference in New Issue
Block a user