code cleanup: refactor txn state code into their own functions
This commit is contained in:
parent
3477d265c6
commit
e859942fa4
@ -798,11 +798,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
defer s.debug.Flush()
|
defer s.debug.Flush()
|
||||||
|
|
||||||
// 0. Prepare the transaction
|
// 0. Prepare the transaction
|
||||||
s.txnLk.Lock()
|
s.prepareTxnProtect(lookbackEpoch)
|
||||||
s.txnRefs = make(map[cid.Cid]struct{})
|
|
||||||
s.txnActive = true
|
|
||||||
s.txnLookbackEpoch = lookbackEpoch
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
|
|
||||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
||||||
log.Info("marking reachable objects")
|
log.Info("marking reachable objects")
|
||||||
@ -827,22 +823,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
||||||
|
|
||||||
// fetch references taken during marking and create the transaction protect filter
|
// begin transactional protection and fetch references created while marking
|
||||||
s.txnLk.Lock()
|
txnRefs := s.beginTxnProtect(markSet)
|
||||||
txnRefs := s.txnRefs
|
defer s.endTxnProtect()
|
||||||
s.txnRefs = nil
|
|
||||||
s.txnMissing = make(map[cid.Cid]struct{})
|
|
||||||
s.txnProtect = markSet
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
s.txnLk.Lock()
|
|
||||||
_ = s.txnProtect.Close()
|
|
||||||
s.txnActive = false
|
|
||||||
s.txnProtect = nil
|
|
||||||
s.txnMissing = nil
|
|
||||||
s.txnLk.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// 1.1 Update markset for references created during marking
|
// 1.1 Update markset for references created during marking
|
||||||
if len(txnRefs) > 0 {
|
if len(txnRefs) > 0 {
|
||||||
@ -958,7 +941,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
|
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
|
||||||
|
|
||||||
// now that we have collected cold objects, check for missing references from transactional i/o
|
// now that we have collected cold objects, check for missing references from transactional i/o
|
||||||
// and disable further collection of such references (they will not be acted upon)
|
// and disable further collection of such references (they will not be acted upon as we can't
|
||||||
|
// possibly delete objects we didn't have when we were collecting cold objects)
|
||||||
s.waitForMissingRefs()
|
s.waitForMissingRefs()
|
||||||
|
|
||||||
// 3. copy the cold objects to the coldstore -- if we have one
|
// 3. copy the cold objects to the coldstore -- if we have one
|
||||||
@ -1003,6 +987,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
// we are done; do some housekeeping
|
// we are done; do some housekeeping
|
||||||
|
s.endTxnProtect()
|
||||||
s.gcHotstore()
|
s.gcHotstore()
|
||||||
|
|
||||||
err = s.setBaseEpoch(boundaryEpoch)
|
err = s.setBaseEpoch(boundaryEpoch)
|
||||||
@ -1018,6 +1003,37 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) prepareTxnProtect(lookbackEpoch abi.ChainEpoch) {
|
||||||
|
s.txnLk.Lock()
|
||||||
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
|
s.txnRefs = make(map[cid.Cid]struct{})
|
||||||
|
s.txnActive = true
|
||||||
|
s.txnLookbackEpoch = lookbackEpoch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) beginTxnProtect(markSet MarkSet) map[cid.Cid]struct{} {
|
||||||
|
s.txnLk.Lock()
|
||||||
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
|
txnRefs := s.txnRefs
|
||||||
|
s.txnRefs = nil
|
||||||
|
s.txnMissing = make(map[cid.Cid]struct{})
|
||||||
|
s.txnProtect = markSet
|
||||||
|
|
||||||
|
return txnRefs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) endTxnProtect() {
|
||||||
|
s.txnLk.Lock()
|
||||||
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
|
_ = s.txnProtect.Close()
|
||||||
|
s.txnActive = false
|
||||||
|
s.txnProtect = nil
|
||||||
|
s.txnMissing = nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
|
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
|
||||||
f func(cid.Cid) error) error {
|
f func(cid.Cid) error) error {
|
||||||
visited := cid.NewSet()
|
visited := cid.NewSet()
|
||||||
|
Loading…
Reference in New Issue
Block a user