diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 2ccb2de1f..b16917621 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "os" + "runtime" "sort" "sync" "sync/atomic" @@ -228,11 +229,7 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) { } if has { - err = s.trackTxnRef(cid) - if err != nil { - log.Warnf("error tracking reference to %s: %s", cid, err) - } - + s.trackTxnRef(cid) return true, nil } @@ -256,11 +253,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { switch err { case nil: - err = s.trackTxnRef(cid) - if err != nil { - log.Warnf("error tracking reference to %s: %s", cid, err) - } - + s.trackTxnRef(cid) return blk, nil case bstore.ErrNotFound: @@ -302,11 +295,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { switch err { case nil: - err = s.trackTxnRef(cid) - if err != nil { - log.Warnf("error tracking reference to %s: %s", cid, err) - } - + s.trackTxnRef(cid) return size, nil case bstore.ErrNotFound: @@ -345,11 +334,7 @@ func (s *SplitStore) Put(blk blocks.Block) error { s.debug.LogWrite(blk) - err = s.trackTxnRef(blk.Cid()) - if err != nil { - log.Warnf("error tracking reference to %s: %s", blk.Cid(), err) - } - + s.trackTxnRef(blk.Cid()) return nil } @@ -394,11 +379,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { s.debug.LogWriteMany(blks) - err = s.trackTxnRefMany(batch) - if err != nil { - log.Warnf("error tracking reference to batch: %s", err) - } - + s.trackTxnRefMany(batch) return nil } @@ -461,11 +442,7 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { // view can't have its data pointer deleted, which would be catastrophic. // Note that we can't just RLock for the duration of the view, as this could // lead to deadlock with recursive views. - wg, err := s.protectView(cid) - if err != nil { - log.Warnf("error protecting view to %s: %s", cid, err) - } - + wg := s.protectView(cid) if wg != nil { defer wg.Done() } @@ -619,115 +596,191 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { // transactionally protect incoming tipsets func (s *SplitStore) protectTipSets(apply []*types.TipSet) { s.txnLk.RLock() + defer s.txnLk.RUnlock() + if !s.txnActive { - s.txnLk.RUnlock() return } - // do this in a goroutine to avoid blocking the notifier - go func() { - defer s.txnLk.RUnlock() + var cids []cid.Cid + for _, ts := range apply { + cids = append(cids, ts.Cids()...) + } - var cids []cid.Cid - for _, ts := range apply { - cids = append(cids, ts.Cids()...) - } - - err := s.trackTxnRefMany(cids) - if err != nil { - log.Errorf("error protecting newly applied tipsets: %s", err) - } - }() + s.trackTxnRefMany(cids) } // transactionally protect a view -func (s *SplitStore) protectView(c cid.Cid) (*sync.WaitGroup, error) { +func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup { s.txnLk.RLock() defer s.txnLk.RUnlock() if !s.txnActive { s.txnViews.Add(1) - return s.txnViews, nil + return s.txnViews } - err := s.trackTxnRef(c) - return nil, err + s.trackTxnRef(c) + return nil } // transactionally protect a reference to an object -func (s *SplitStore) trackTxnRef(c cid.Cid) error { +func (s *SplitStore) trackTxnRef(c cid.Cid) { if !s.txnActive { // not compacting - return nil + return } - if s.txnRefs != nil { - // we haven't finished marking yet, so track the reference - s.txnRefsMx.Lock() - s.txnRefs[c] = struct{}{} - s.txnRefsMx.Unlock() - return nil + if isUnitaryObject(c) { + return } - // we have finished marking, protect the reference - return s.doTxnProtect(c, nil) + if s.txnProtect != nil { + mark, err := s.txnProtect.Has(c) + if err != nil { + log.Warnf("error checking markset: %s", err) + goto track + } + + if mark { + return + } + } + +track: + s.txnRefsMx.Lock() + s.txnRefs[c] = struct{}{} + s.txnRefsMx.Unlock() + return } // transactionally protect a batch of references -func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { +func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { if !s.txnActive { // not compacting - return nil + return } - if s.txnRefs != nil { - // we haven't finished marking yet, so track the references + s.txnRefsMx.Lock() + quiet := false + for _, c := range cids { + if isUnitaryObject(c) { + continue + } + + if s.txnProtect != nil { + mark, err := s.txnProtect.Has(c) + if err != nil { + if !quiet { + quiet = true + log.Warnf("error checking markset: %s", err) + } + continue + } + + if mark { + continue + } + } + + s.txnRefs[c] = struct{}{} + } + s.txnRefsMx.Unlock() + return +} + +// protect all pending transactional references +func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { + for { + var txnRefs map[cid.Cid]struct{} + s.txnRefsMx.Lock() - for _, c := range cids { - s.txnRefs[c] = struct{}{} + if len(s.txnRefs) > 0 { + txnRefs = s.txnRefs + s.txnRefs = make(map[cid.Cid]struct{}) } s.txnRefsMx.Unlock() - return nil - } - // we have finished marking, protect the refs - batch := make(map[cid.Cid]struct{}, len(cids)) - for _, c := range cids { - batch[c] = struct{}{} - } - - for _, c := range cids { - err := s.doTxnProtect(c, batch) - if err != nil { - return err + if len(txnRefs) == 0 { + return nil } - } - return nil + log.Infow("protecting transactional references", "refs", len(txnRefs)) + count := 0 + workch := make(chan cid.Cid, len(txnRefs)) + startProtect := time.Now() + + for c := range txnRefs { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset: %w", err) + } + + if mark { + continue + } + + workch <- c + count++ + } + + if count == 0 { + return nil + } + + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 + } + if workers > count { + workers = count + } + + close(workch) + + worker := func(wg *sync.WaitGroup) { + if wg != nil { + defer wg.Done() + } + + for c := range workch { + err := s.doTxnProtect(c, markSet) + if err != nil { + log.Warnf("error protecting transactional references: %s", err) + return + } + } + } + + if workers > 1 { + wg := new(sync.WaitGroup) + for i := 0; i < workers; i++ { + wg.Add(1) + go worker(wg) + } + wg.Wait() + } else { + worker(nil) + } + + log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count) + } } // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. -func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error { +func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { // Note: cold objects are deleted heaviest first, so the consituents of an object // cannot be deleted before the object itself. - err := s.walkObjectIncomplete(root, cid.NewSet(), + return s.walkObjectIncomplete(root, cid.NewSet(), func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk } - if c != root { - _, ok := batch[c] - if ok { - // it's on the same batch, stop walk - return errStopWalk - } - } - - mark, err := s.txnProtect.Has(c) + mark, err := markSet.Has(c) if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) + return xerrors.Errorf("error checking markset: %w", err) } // it's marked, nothing to do @@ -735,23 +788,17 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro return errStopWalk } - return s.txnProtect.Mark(c) + return markSet.Mark(c) }, func(c cid.Cid) error { - log.Warnf("missing object reference %s in %s", c, root) if s.txnMissing != nil { + log.Warnf("missing object reference %s in %s", c, root) s.txnRefsMx.Lock() s.txnMissing[c] = struct{}{} s.txnRefsMx.Unlock() } return errStopWalk }) - - if err != nil { - log.Warnf("error protecting object (cid: %s): %s", root, err) - } - - return err } // warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore; @@ -905,6 +952,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } + // we are ready for concurrent marking + s.beginTxnMarking(markSet) + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") @@ -933,66 +983,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } - // begin transactional protection with concurrent marking and fetch references created while marking - txnRefs := s.beginTxnConcurrentMarking(markSet) - - // 1.1 Update markset for references created during marking - if len(txnRefs) > 0 { - log.Infow("updating mark set for live references", "refs", len(txnRefs)) - startMark = time.Now() - walked := cid.NewSet() - count = 0 - - for c := range txnRefs { - if err := s.checkClosing(); err != nil { - return err - } - - if isUnitaryObject(c) { - continue - } - - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking markset for %s: %w", c, err) - } - - if mark { - continue - } - - err = s.walkObjectIncomplete(c, walked, - func(c cid.Cid) error { - if isUnitaryObject(c) { - return errStopWalk - } - - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking markset for %s: %w", c, err) - } - - if mark { - return errStopWalk - } - - count++ - return markSet.Mark(c) - }, - func(cm cid.Cid) error { - log.Warnf("missing object reference %s in %s", cm, c) //nolint - s.txnRefsMx.Lock() - s.txnMissing[cm] = struct{}{} - s.txnRefsMx.Unlock() - return errStopWalk - }) - - if err != nil { - return xerrors.Errorf("error walking %s for marking: %w", c, err) - } - } - - log.Infow("update mark set done", "took", time.Since(startMark), "marked", count) + // 1.1 protect transactional refs + err = s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) } if err := s.checkClosing(); err != nil { @@ -1047,7 +1041,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // now that we have collected cold objects, check for missing references from transactional i/o // and disable further collection of such references (they will not be acted upon as we can't // possibly delete objects we didn't have when we were collecting cold objects) - s.waitForMissingRefs() + s.waitForMissingRefs(markSet) if err := s.checkClosing(); err != nil { return err @@ -1062,6 +1056,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error moving cold objects: %w", err) } log.Infow("moving done", "took", time.Since(startMove)) + + if err := s.checkClosing(); err != nil { + return err + } } // 4. sort cold objects so that the dags with most references are deleted first @@ -1075,6 +1073,16 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("sorting done", "took", time.Since(startSort)) + // 4.1 protect transactional refs once more + err = s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) + } + + if err := s.checkClosing(); err != nil { + return err + } + // Enter critical section log.Info("entering critical section") atomic.StoreInt32(&s.critsection, 1) @@ -1088,7 +1096,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // 5. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() - err = s.purge(cold) + err = s.purge(cold, markSet) if err != nil { return xerrors.Errorf("error purging cold blocks: %w", err) } @@ -1121,6 +1129,7 @@ func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) *sync.WaitGroup { s.txnActive = true s.txnLookbackEpoch = lookbackEpoch s.txnRefs = make(map[cid.Cid]struct{}) + s.txnMissing = make(map[cid.Cid]struct{}) wg := s.txnViews s.txnViews = nil @@ -1128,18 +1137,12 @@ func (s *SplitStore) beginTxnProtect(curTs *types.TipSet) *sync.WaitGroup { return wg } -func (s *SplitStore) beginTxnConcurrentMarking(markSet MarkSet) map[cid.Cid]struct{} { - s.txnLk.Lock() - defer s.txnLk.Unlock() - +func (s *SplitStore) beginTxnMarking(markSet MarkSet) { markSet.SetConcurrent() - txnRefs := s.txnRefs - s.txnRefs = nil - s.txnMissing = make(map[cid.Cid]struct{}) + s.txnLk.Lock() s.txnProtect = markSet - - return txnRefs + s.txnLk.Unlock() } func (s *SplitStore) endTxnProtect() { @@ -1150,7 +1153,6 @@ func (s *SplitStore) endTxnProtect() { return } - _ = s.txnProtect.Close() s.txnActive = false s.txnProtect = nil s.txnRefs = nil @@ -1508,7 +1510,7 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro return nil } -func (s *SplitStore) purge(cids []cid.Cid) error { +func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { deadCids := make([]cid.Cid, 0, batchSize) var purgeCnt, liveCnt int defer func() { @@ -1519,15 +1521,26 @@ func (s *SplitStore) purge(cids []cid.Cid) error { func(cids []cid.Cid) error { deadCids := deadCids[:0] + again: if err := s.checkClosing(); err != nil { return err } s.txnLk.Lock() - defer s.txnLk.Unlock() + if len(s.txnRefs) > 0 { + s.txnLk.Unlock() + err := s.protectTxnRefs(markSet) + if err != nil { + return xerrors.Errorf("error protecting transactional refs: %w", err) + } + + goto again + } + + defer s.txnLk.Unlock() for _, c := range cids { - live, err := s.txnProtect.Has(c) + live, err := markSet.Has(c) if err != nil { return xerrors.Errorf("error checking for liveness: %w", err) } @@ -1559,7 +1572,7 @@ func (s *SplitStore) purge(cids []cid.Cid) error { // have this gem[TM]. // My best guess is that they are parent message receipts or yet to be computed state roots; magik // thinks the cause may be block validation. -func (s *SplitStore) waitForMissingRefs() { +func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { s.txnLk.Lock() missing := s.txnMissing s.txnMissing = nil @@ -1598,7 +1611,7 @@ func (s *SplitStore) waitForMissingRefs() { return errStopWalk } - mark, err := s.txnProtect.Has(c) + mark, err := markSet.Has(c) if err != nil { return xerrors.Errorf("error checking markset for %s: %w", c, err) } @@ -1608,7 +1621,7 @@ func (s *SplitStore) waitForMissingRefs() { } count++ - return s.txnProtect.Mark(c) + return markSet.Mark(c) }, func(c cid.Cid) error { missing[c] = struct{}{}