properly handle protecting long-running views

This commit is contained in:
vyzo 2021-07-09 13:20:18 +03:00
parent 565faff754
commit acc4c374ef

View File

@ -143,6 +143,7 @@ type SplitStore struct {
txnLk sync.RWMutex txnLk sync.RWMutex
txnActive bool txnActive bool
txnLookbackEpoch abi.ChainEpoch txnLookbackEpoch abi.ChainEpoch
txnViews *sync.WaitGroup
txnProtect MarkSet txnProtect MarkSet
txnRefsMx sync.Mutex txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{} txnRefs map[cid.Cid]struct{}
@ -184,6 +185,8 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
cold: cold, cold: cold,
markSetEnv: markSetEnv, markSetEnv: markSetEnv,
txnViews: new(sync.WaitGroup),
coldPurgeSize: defaultColdPurgeSize, coldPurgeSize: defaultColdPurgeSize,
} }
@ -448,18 +451,28 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
return cb(data) return cb(data)
} }
// optimistically protect the reference so that we can call the underlying View err := s.hot.View(cid,
// without holding hte lock. func(data []byte) error {
// This allows the user callback to call into the blockstore without deadlocking. // views are protected two-fold:
s.txnLk.RLock() // - if there is an active transaction, then the reference is protected.
err := s.trackTxnRef(cid) // - if there is no active transaction, active views are tracked in a
s.txnLk.RUnlock() // wait group and compaction is inhibited from starting until they
// have all completed. this is necessary to ensure that a (very) long-running
// view can't have its data pointer deleted, which would be catastrophic.
// Note that we can't just RLock for the duration of the view, as this could
// lead to deadlock with recursive views.
wg, err := s.protectView(cid)
if err != nil {
log.Warnf("error protecting view to %s: %s", cid, err)
}
if err != nil { if wg != nil {
log.Warnf("error tracking reference to %s: %s", cid, err) defer wg.Done()
} }
return cb(data)
})
err = s.hot.View(cid, cb)
switch err { switch err {
case bstore.ErrNotFound: case bstore.ErrNotFound:
if s.debug != nil { if s.debug != nil {
@ -583,7 +596,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
if epoch-s.baseEpoch > CompactionThreshold { if epoch-s.baseEpoch > CompactionThreshold {
// it's time to compact -- prepare the transaction and go! // it's time to compact -- prepare the transaction and go!
s.beginTxnProtect(curTs) wg := s.beginTxnProtect(curTs)
go func() { go func() {
defer atomic.StoreInt32(&s.compacting, 0) defer atomic.StoreInt32(&s.compacting, 0)
defer s.endTxnProtect() defer s.endTxnProtect()
@ -591,7 +604,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
log.Info("compacting splitstore") log.Info("compacting splitstore")
start := time.Now() start := time.Now()
s.compact(curTs) s.compact(curTs, wg)
log.Infow("compaction done", "took", time.Since(start)) log.Infow("compaction done", "took", time.Since(start))
}() }()
@ -627,6 +640,20 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
}() }()
} }
// transactionally protect a view
func (s *SplitStore) protectView(c cid.Cid) (*sync.WaitGroup, error) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
if !s.txnActive {
s.txnViews.Add(1)
return s.txnViews, nil
}
err := s.trackTxnRef(c)
return nil, err
}
// transactionally protect a reference to an object // transactionally protect a reference to an object
func (s *SplitStore) trackTxnRef(c cid.Cid) error { func (s *SplitStore) trackTxnRef(c cid.Cid) error {
if !s.txnActive { if !s.txnActive {
@ -844,8 +871,13 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references) // - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live // - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
// - We then end the transaction and compact/gc the hotstore. // - We then end the transaction and compact/gc the hotstore.
func (s *SplitStore) compact(curTs *types.TipSet) { func (s *SplitStore) compact(curTs *types.TipSet, wg *sync.WaitGroup) {
log.Info("waiting for active views to complete")
start := time.Now() start := time.Now()
wg.Wait()
log.Infow("waiting for active views done", "took", time.Since(start))
start = time.Now()
err := s.doCompact(curTs) err := s.doCompact(curTs)
took := time.Since(start).Milliseconds() took := time.Since(start).Milliseconds()
stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
@ -1079,16 +1111,21 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil return nil
} }
func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) { func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) *sync.WaitGroup {
lookbackEpoch := curTs.Height() - CompactionLookback lookbackEpoch := curTs.Height() - CompactionLookback
log.Info("preparing compaction transaction") log.Info("preparing compaction transaction")
s.txnLk.Lock() s.txnLk.Lock()
defer s.txnLk.Unlock() defer s.txnLk.Unlock()
s.txnRefs = make(map[cid.Cid]struct{})
s.txnActive = true s.txnActive = true
s.txnLookbackEpoch = lookbackEpoch s.txnLookbackEpoch = lookbackEpoch
s.txnRefs = make(map[cid.Cid]struct{})
wg := s.txnViews
s.txnViews = nil
return wg
} }
func (s *SplitStore) beginTxnConcurrentMarking(markSet MarkSet) map[cid.Cid]struct{} { func (s *SplitStore) beginTxnConcurrentMarking(markSet MarkSet) map[cid.Cid]struct{} {
@ -1109,13 +1146,16 @@ func (s *SplitStore) endTxnProtect() {
s.txnLk.Lock() s.txnLk.Lock()
defer s.txnLk.Unlock() defer s.txnLk.Unlock()
if s.txnProtect != nil { if !s.txnActive {
_ = s.txnProtect.Close() return
} }
_ = s.txnProtect.Close()
s.txnActive = false s.txnActive = false
s.txnProtect = nil s.txnProtect = nil
s.txnRefs = nil s.txnRefs = nil
s.txnMissing = nil s.txnMissing = nil
s.txnViews = new(sync.WaitGroup)
} }
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,