dont mark references inline; instad rely on the main compaction thread to do concurrent marking

The problem is that it is possible that an inline marking might take minutes for some objects
(infrequent, but still possible for state roots and prohibitive if that's a block validation).
So we simply track references continuously and rely on the main compaction thread to trigger
concurrent marking for all references at opportune moments.

Assumption: we can mark references faster than they are created during purge or else we'll
never purge anything.
This commit is contained in:
vyzo 2021-07-09 15:10:02 +03:00
parent acc4c374ef
commit da0feb3fa4

View File

@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"os"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -228,11 +229,7 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
}
if has {
err = s.trackTxnRef(cid)
if err != nil {
log.Warnf("error tracking reference to %s: %s", cid, err)
}
s.trackTxnRef(cid)
return true, nil
}
@ -256,11 +253,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
switch err {
case nil:
err = s.trackTxnRef(cid)
if err != nil {
log.Warnf("error tracking reference to %s: %s", cid, err)
}
s.trackTxnRef(cid)
return blk, nil
case bstore.ErrNotFound:
@ -302,11 +295,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
switch err {
case nil:
err = s.trackTxnRef(cid)
if err != nil {
log.Warnf("error tracking reference to %s: %s", cid, err)
}
s.trackTxnRef(cid)
return size, nil
case bstore.ErrNotFound:
@ -345,11 +334,7 @@ func (s *SplitStore) Put(blk blocks.Block) error {
s.debug.LogWrite(blk)
err = s.trackTxnRef(blk.Cid())
if err != nil {
log.Warnf("error tracking reference to %s: %s", blk.Cid(), err)
}
s.trackTxnRef(blk.Cid())
return nil
}
@ -394,11 +379,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
s.debug.LogWriteMany(blks)
err = s.trackTxnRefMany(batch)
if err != nil {
log.Warnf("error tracking reference to batch: %s", err)
}
s.trackTxnRefMany(batch)
return nil
}
@ -461,11 +442,7 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
// view can't have its data pointer deleted, which would be catastrophic.
// Note that we can't just RLock for the duration of the view, as this could
// lead to deadlock with recursive views.
wg, err := s.protectView(cid)
if err != nil {
log.Warnf("error protecting view to %s: %s", cid, err)
}
wg := s.protectView(cid)
if wg != nil {
defer wg.Done()
}
@ -619,115 +596,191 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
// transactionally protect incoming tipsets
func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
if !s.txnActive {
s.txnLk.RUnlock()
return
}
// do this in a goroutine to avoid blocking the notifier
go func() {
defer s.txnLk.RUnlock()
var cids []cid.Cid
for _, ts := range apply {
cids = append(cids, ts.Cids()...)
}
var cids []cid.Cid
for _, ts := range apply {
cids = append(cids, ts.Cids()...)
}
err := s.trackTxnRefMany(cids)
if err != nil {
log.Errorf("error protecting newly applied tipsets: %s", err)
}
}()
s.trackTxnRefMany(cids)
}
// transactionally protect a view
func (s *SplitStore) protectView(c cid.Cid) (*sync.WaitGroup, error) {
func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
if !s.txnActive {
s.txnViews.Add(1)
return s.txnViews, nil
return s.txnViews
}
err := s.trackTxnRef(c)
return nil, err
s.trackTxnRef(c)
return nil
}
// transactionally protect a reference to an object
func (s *SplitStore) trackTxnRef(c cid.Cid) error {
func (s *SplitStore) trackTxnRef(c cid.Cid) {
if !s.txnActive {
// not compacting
return nil
return
}
if s.txnRefs != nil {
// we haven't finished marking yet, so track the reference
s.txnRefsMx.Lock()
s.txnRefs[c] = struct{}{}
s.txnRefsMx.Unlock()
return nil
if isUnitaryObject(c) {
return
}
// we have finished marking, protect the reference
return s.doTxnProtect(c, nil)
if s.txnProtect != nil {
mark, err := s.txnProtect.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
goto track
}
if mark {
return
}
}
track:
s.txnRefsMx.Lock()
s.txnRefs[c] = struct{}{}
s.txnRefsMx.Unlock()
return
}
// transactionally protect a batch of references
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
if !s.txnActive {
// not compacting
return nil
return
}
if s.txnRefs != nil {
// we haven't finished marking yet, so track the references
s.txnRefsMx.Lock()
quiet := false
for _, c := range cids {
if isUnitaryObject(c) {
continue
}
if s.txnProtect != nil {
mark, err := s.txnProtect.Has(c)
if err != nil {
if !quiet {
quiet = true
log.Warnf("error checking markset: %s", err)
}
continue
}
if mark {
continue
}
}
s.txnRefs[c] = struct{}{}
}
s.txnRefsMx.Unlock()
return
}
// protect all pending transactional references
func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
for {
var txnRefs map[cid.Cid]struct{}
s.txnRefsMx.Lock()
for _, c := range cids {
s.txnRefs[c] = struct{}{}
if len(s.txnRefs) > 0 {
txnRefs = s.txnRefs
s.txnRefs = make(map[cid.Cid]struct{})
}
s.txnRefsMx.Unlock()
return nil
}
// we have finished marking, protect the refs
batch := make(map[cid.Cid]struct{}, len(cids))
for _, c := range cids {
batch[c] = struct{}{}
}
for _, c := range cids {
err := s.doTxnProtect(c, batch)
if err != nil {
return err
if len(txnRefs) == 0 {
return nil
}
}
return nil
log.Infow("protecting transactional references", "refs", len(txnRefs))
count := 0
workch := make(chan cid.Cid, len(txnRefs))
startProtect := time.Now()
for c := range txnRefs {
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset: %w", err)
}
if mark {
continue
}
workch <- c
count++
}
if count == 0 {
return nil
}
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
if workers > count {
workers = count
}
close(workch)
worker := func(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
for c := range workch {
err := s.doTxnProtect(c, markSet)
if err != nil {
log.Warnf("error protecting transactional references: %s", err)
return
}
}
}
if workers > 1 {
wg := new(sync.WaitGroup)
for i := 0; i < workers; i++ {
wg.Add(1)
go worker(wg)
}
wg.Wait()
} else {
worker(nil)
}
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count)
}
}
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error {
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
// Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself.
err := s.walkObjectIncomplete(root, cid.NewSet(),
return s.walkObjectIncomplete(root, cid.NewSet(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
if c != root {
_, ok := batch[c]
if ok {
// it's on the same batch, stop walk
return errStopWalk
}
}
mark, err := s.txnProtect.Has(c)
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
return xerrors.Errorf("error checking markset: %w", err)
}
// it's marked, nothing to do
@ -735,23 +788,17 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro
return errStopWalk
}
return s.txnProtect.Mark(c)
return markSet.Mark(c)
},
func(c cid.Cid) error {
log.Warnf("missing object reference %s in %s", c, root)
if s.txnMissing != nil {
log.Warnf("missing object reference %s in %s", c, root)
s.txnRefsMx.Lock()
s.txnMissing[c] = struct{}{}
s.txnRefsMx.Unlock()
}
return errStopWalk
})
if err != nil {
log.Warnf("error protecting object (cid: %s): %s", root, err)
}
return err
}
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
@ -905,6 +952,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
// we are ready for concurrent marking
s.beginTxnMarking(markSet)
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
@ -933,66 +983,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
// begin transactional protection with concurrent marking and fetch references created while marking
txnRefs := s.beginTxnConcurrentMarking(markSet)
// 1.1 Update markset for references created during marking
if len(txnRefs) > 0 {
log.Infow("updating mark set for live references", "refs", len(txnRefs))
startMark = time.Now()
walked := cid.NewSet()
count = 0
for c := range txnRefs {
if err := s.checkClosing(); err != nil {
return err
}
if isUnitaryObject(c) {
continue
}
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
if mark {
continue
}
err = s.walkObjectIncomplete(c, walked,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
if mark {
return errStopWalk
}
count++
return markSet.Mark(c)
},
func(cm cid.Cid) error {
log.Warnf("missing object reference %s in %s", cm, c) //nolint
s.txnRefsMx.Lock()
s.txnMissing[cm] = struct{}{}
s.txnRefsMx.Unlock()
return errStopWalk
})
if err != nil {
return xerrors.Errorf("error walking %s for marking: %w", c, err)
}
}
log.Infow("update mark set done", "took", time.Since(startMark), "marked", count)
// 1.1 protect transactional refs
err = s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}
if err := s.checkClosing(); err != nil {
@ -1047,7 +1041,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// now that we have collected cold objects, check for missing references from transactional i/o
// and disable further collection of such references (they will not be acted upon as we can't
// possibly delete objects we didn't have when we were collecting cold objects)
s.waitForMissingRefs()
s.waitForMissingRefs(markSet)
if err := s.checkClosing(); err != nil {
return err
@ -1062,6 +1056,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error moving cold objects: %w", err)
}
log.Infow("moving done", "took", time.Since(startMove))
if err := s.checkClosing(); err != nil {
return err
}
}
// 4. sort cold objects so that the dags with most references are deleted first
@ -1075,6 +1073,16 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("sorting done", "took", time.Since(startSort))
// 4.1 protect transactional refs once more
err = s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}
if err := s.checkClosing(); err != nil {
return err
}
// Enter critical section
log.Info("entering critical section")
atomic.StoreInt32(&s.critsection, 1)
@ -1088,7 +1096,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// 5. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.purge(cold)
err = s.purge(cold, markSet)
if err != nil {
return xerrors.Errorf("error purging cold blocks: %w", err)
}
@ -1121,6 +1129,7 @@ func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) *sync.WaitGroup {
s.txnActive = true
s.txnLookbackEpoch = lookbackEpoch
s.txnRefs = make(map[cid.Cid]struct{})
s.txnMissing = make(map[cid.Cid]struct{})
wg := s.txnViews
s.txnViews = nil
@ -1128,18 +1137,12 @@ func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) *sync.WaitGroup {
return wg
}
func (s *SplitStore) beginTxnConcurrentMarking(markSet MarkSet) map[cid.Cid]struct{} {
s.txnLk.Lock()
defer s.txnLk.Unlock()
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
markSet.SetConcurrent()
txnRefs := s.txnRefs
s.txnRefs = nil
s.txnMissing = make(map[cid.Cid]struct{})
s.txnLk.Lock()
s.txnProtect = markSet
return txnRefs
s.txnLk.Unlock()
}
func (s *SplitStore) endTxnProtect() {
@ -1150,7 +1153,6 @@ func (s *SplitStore) endTxnProtect() {
return
}
_ = s.txnProtect.Close()
s.txnActive = false
s.txnProtect = nil
s.txnRefs = nil
@ -1508,7 +1510,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
return nil
}
func (s *SplitStore) purge(cids []cid.Cid) error {
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
@ -1519,15 +1521,26 @@ func (s *SplitStore) purge(cids []cid.Cid) error {
func(cids []cid.Cid) error {
deadCids := deadCids[:0]
again:
if err := s.checkClosing(); err != nil {
return err
}
s.txnLk.Lock()
defer s.txnLk.Unlock()
if len(s.txnRefs) > 0 {
s.txnLk.Unlock()
err := s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}
goto again
}
defer s.txnLk.Unlock()
for _, c := range cids {
live, err := s.txnProtect.Has(c)
live, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking for liveness: %w", err)
}
@ -1559,7 +1572,7 @@ func (s *SplitStore) purge(cids []cid.Cid) error {
// have this gem[TM].
// My best guess is that they are parent message receipts or yet to be computed state roots; magik
// thinks the cause may be block validation.
func (s *SplitStore) waitForMissingRefs() {
func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
s.txnLk.Lock()
missing := s.txnMissing
s.txnMissing = nil
@ -1598,7 +1611,7 @@ func (s *SplitStore) waitForMissingRefs() {
return errStopWalk
}
mark, err := s.txnProtect.Has(c)
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
@ -1608,7 +1621,7 @@ func (s *SplitStore) waitForMissingRefs() {
}
count++
return s.txnProtect.Mark(c)
return markSet.Mark(c)
},
func(c cid.Cid) error {
missing[c] = struct{}{}