diff --git a/blockstore/splitstore/debug.go b/blockstore/splitstore/debug.go index 47d61816f..18ea436da 100644 --- a/blockstore/splitstore/debug.go +++ b/blockstore/splitstore/debug.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" ) @@ -102,7 +103,7 @@ func (d *debugLog) LogReadMiss(curTs *types.TipSet, cid cid.Cid) { } } -func (d *debugLog) LogWrite(curTs *types.TipSet, c cid.Cid, writeEpoch abi.ChainEpoch) { +func (d *debugLog) LogWrite(curTs *types.TipSet, blk blocks.Block, writeEpoch abi.ChainEpoch) { if d == nil { return } @@ -122,13 +123,13 @@ func (d *debugLog) LogWrite(curTs *types.TipSet, c cid.Cid, writeEpoch abi.Chain d.writeCnt++ - _, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", d.timestamp(), curEpoch, c, writeEpoch, stack) + _, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", d.timestamp(), curEpoch, blk.Cid(), writeEpoch, stack) if err != nil { log.Warnf("error writing write log: %s", err) } } -func (d *debugLog) LogWriteMany(curTs *types.TipSet, cids []cid.Cid, writeEpoch abi.ChainEpoch) { +func (d *debugLog) LogWriteMany(curTs *types.TipSet, blks []blocks.Block, writeEpoch abi.ChainEpoch) { if d == nil { return } @@ -146,11 +147,11 @@ func (d *debugLog) LogWriteMany(curTs *types.TipSet, cids []cid.Cid, writeEpoch d.writeMx.Lock() defer d.writeMx.Unlock() - d.writeCnt += len(cids) + d.writeCnt += len(blks) now := d.timestamp() - for _, c := range cids { - _, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", now, curEpoch, c, writeEpoch, stack) + for _, blk := range blks { + _, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", now, curEpoch, blk.Cid(), writeEpoch, stack) if err != nil { log.Warnf("error writing write log: %s", err) break diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 2353181fb..8a32f359d 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -103,16 +103,11 @@ type Config struct { // Supported values are: "bloom" (default if omitted), "bolt". MarkSetType string - // HotHeaders indicates whether to keep chain block headers in hotstore or not. - // This is necessary, and automatically set by DI in lotus node construction, if - // you are running with a noop coldstore. - HotHeaders bool - // SkipMoveColdBlocks indicates whether to skip moving cold blocks to the coldstore. // If the splitstore is running with a noop coldstore then this option is set to true // which skips moving (as it is a noop, but still takes time to read all the cold objects) // and directly purges cold blocks. - SkipMoveColdBlocks bool + DiscardColdBlocks bool } // ChainAccessor allows the Splitstore to access the chain. It will most likely @@ -140,11 +135,10 @@ type SplitStore struct { mx sync.Mutex curTs *types.TipSet - chain ChainAccessor - ds dstore.Datastore - hot bstore.Blockstore - cold bstore.Blockstore - tracker TrackingStore + chain ChainAccessor + ds dstore.Datastore + hot bstore.Blockstore + cold bstore.Blockstore markSetEnv MarkSetEnv markSetSize int64 @@ -159,9 +153,8 @@ type SplitStore struct { txnEnv MarkSetEnv txnProtect MarkSet txnMarkSet MarkSet - - // pending write set - pendingWrites map[cid.Cid]struct{} + txnRefs map[cid.Cid]struct{} + txnActive bool } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -170,23 +163,20 @@ var _ bstore.Blockstore = (*SplitStore)(nil) // is backed by the provided hot and cold stores. The returned SplitStore MUST be // attached to the ChainStore with Start in order to trigger compaction. func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) { - // the tracking store - tracker, err := OpenTrackingStore(path, cfg.TrackingStoreType) - if err != nil { - return nil, err + // hot blockstore must support BlockstoreIterator + if _, ok := hot.(bstore.BlockstoreIterator); !ok { + return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot) } // the markset env - markSetEnv, err := OpenMarkSetEnv(path, "bolt") + markSetEnv, err := OpenMarkSetEnv(path, "mapts") if err != nil { - _ = tracker.Close() return nil, err } // the txn markset env txnEnv, err := OpenMarkSetEnv(path, "mapts") if err != nil { - _ = tracker.Close() _ = markSetEnv.Close() return nil, err } @@ -197,13 +187,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ds: ds, hot: hot, cold: cold, - tracker: tracker, markSetEnv: markSetEnv, txnEnv: txnEnv, coldPurgeSize: defaultColdPurgeSize, - - pendingWrites: make(map[cid.Cid]struct{}), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -244,13 +231,6 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) { // -- the vm uses this check to avoid duplicate writes on Copy. // When we have options in the API (or something better), the vm can explicitly signal // that this is an implicit Write. - s.trackWrite(c) - - // also make sure the object is considered live during compaction in case we have already - // flushed pending writes and started compaction. - // when within vm copy context, dags will be recursively referenced. - // in case of a race with purge, this will return a track error, which we can use to - // signal to the vm that the object is not fully present. err = s.trackTxnRef(c, true) if xerrors.Is(err, errMissingObject) { // we failed to recursively protect the object because some inner object has been purged; @@ -276,12 +256,14 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { return blk, err case bstore.ErrNotFound: - s.mx.Lock() - warmup := s.warmupEpoch > 0 - curTs := s.curTs - s.mx.Unlock() - if warmup { - s.debug.LogReadMiss(curTs, cid) + if s.debug != nil { + s.mx.Lock() + warm := s.warmupEpoch > 0 + curTs := s.curTs + s.mx.Unlock() + if warm { + s.debug.LogReadMiss(curTs, cid) + } } blk, err = s.cold.Get(cid) @@ -308,12 +290,14 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { return size, err case bstore.ErrNotFound: - s.mx.Lock() - warmup := s.warmupEpoch > 0 - curTs := s.curTs - s.mx.Unlock() - if warmup { - s.debug.LogReadMiss(curTs, cid) + if s.debug != nil { + s.mx.Lock() + warm := s.warmupEpoch > 0 + curTs := s.curTs + s.mx.Unlock() + if warm { + s.debug.LogReadMiss(curTs, cid) + } } size, err = s.cold.GetSize(cid) @@ -333,7 +317,13 @@ func (s *SplitStore) Put(blk blocks.Block) error { err := s.hot.Put(blk) if err == nil { - s.trackWrite(blk.Cid()) + if s.debug != nil { + s.mx.Lock() + curTs := s.curTs + writeEpoch := s.writeEpoch + s.mx.Unlock() + s.debug.LogWrite(curTs, blk, writeEpoch) + } err = s.trackTxnRef(blk.Cid(), false) } @@ -351,7 +341,14 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { err := s.hot.PutMany(blks) if err == nil { - s.trackWriteMany(batch) + if s.debug != nil { + s.mx.Lock() + curTs := s.curTs + writeEpoch := s.writeEpoch + s.mx.Unlock() + s.debug.LogWriteMany(curTs, blks, writeEpoch) + } + err = s.trackTxnRefMany(batch) } @@ -408,12 +405,14 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { return err case bstore.ErrNotFound: - s.mx.Lock() - warmup := s.warmupEpoch > 0 - curTs := s.curTs - s.mx.Unlock() - if warmup { - s.debug.LogReadMiss(curTs, cid) + if s.debug != nil { + s.mx.Lock() + warm := s.warmupEpoch > 0 + curTs := s.curTs + s.mx.Unlock() + if warm { + s.debug.LogReadMiss(curTs, cid) + } } err = s.cold.View(cid, cb) @@ -485,11 +484,11 @@ func (s *SplitStore) Start(chain ChainAccessor) error { return xerrors.Errorf("error loading mark set size: %w", err) } - s.updateWriteEpoch() + log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch) - log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch, "writeEpoch", s.writeEpoch) - - go s.background() + if s.debug != nil { + go s.background() + } // watch the chain chain.SubscribeHeadChanges(s.HeadChange) @@ -507,9 +506,8 @@ func (s *SplitStore) Close() error { } } - s.flushPendingWrites(false) s.cancel() - return multierr.Combine(s.tracker.Close(), s.markSetEnv.Close(), s.debug.Close()) + return multierr.Combine(s.markSetEnv.Close(), s.debug.Close()) } func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { @@ -524,8 +522,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { s.curTs = curTs s.mx.Unlock() - s.updateWriteEpoch() - timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) if time.Since(timestamp) > SyncGapTime { // don't attempt compaction before we have caught up syncing @@ -557,6 +553,21 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { return nil } +func (s *SplitStore) background() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + + case <-ticker.C: + s.updateWriteEpoch() + } + } +} + func (s *SplitStore) updateWriteEpoch() { s.mx.Lock() defer s.mx.Unlock() @@ -568,7 +579,6 @@ func (s *SplitStore) updateWriteEpoch() { if dt < 0 { writeEpoch := curTs.Height() + 1 if writeEpoch > s.writeEpoch { - s.flushPendingWrites(true) s.writeEpoch = writeEpoch } @@ -577,67 +587,29 @@ func (s *SplitStore) updateWriteEpoch() { writeEpoch := curTs.Height() + abi.ChainEpoch(dt.Seconds())/builtin.EpochDurationSeconds + 1 if writeEpoch > s.writeEpoch { - s.flushPendingWrites(true) s.writeEpoch = writeEpoch } } -// Unfortunately we can't just directly tracker.Put one by one, as it is ridiculously slow with -// bbolt because of syncing (order of 10ms), so we batch them. -func (s *SplitStore) trackWrite(c cid.Cid) { - s.mx.Lock() - defer s.mx.Unlock() - - s.pendingWrites[c] = struct{}{} -} - -// and also combine batch writes into them -func (s *SplitStore) trackWriteMany(cids []cid.Cid) { - s.mx.Lock() - defer s.mx.Unlock() - - for _, c := range cids { - s.pendingWrites[c] = struct{}{} - } -} - -func (s *SplitStore) flushPendingWrites(locked bool) { - if !locked { - s.mx.Lock() - defer s.mx.Unlock() - } - - if len(s.pendingWrites) == 0 { - return - } - - cids := make([]cid.Cid, 0, len(s.pendingWrites)) - for c := range s.pendingWrites { - cids = append(cids, c) - } - s.pendingWrites = make(map[cid.Cid]struct{}) - - epoch := s.writeEpoch - err := s.tracker.PutBatch(cids, epoch) - if err != nil { - log.Errorf("error putting write batch to tracker: %s", err) - } - - s.debug.LogWriteMany(s.curTs, cids, epoch) -} - func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error { - if s.txnProtect == nil { + if !s.txnActive { // not compacting return nil } + if s.txnRefs != nil { + // we haven't finished marking yet, so track the reference + s.txnRefs[c] = struct{}{} + return nil + } + + // we have finished marking, protect the reference if !recursive { return s.txnProtect.Mark(c) } // it's a recursive reference in vm context, protect links if they are not in the markset already - return s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error { + return s.walkObject(c, cid.NewSet(), func(c cid.Cid) error { mark, err := s.txnMarkSet.Has(c) if err != nil { return xerrors.Errorf("error checking mark set for %s: %w", c, err) @@ -675,7 +647,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error { } func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { - if s.txnProtect == nil { + if !s.txnActive { // not compacting return nil } @@ -691,27 +663,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { return err } -func (s *SplitStore) background() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-s.ctx.Done(): - return - - case <-ticker.C: - s.updateWriteEpoch() - } - } -} - func (s *SplitStore) warmup(curTs *types.TipSet) error { - err := s.loadGenesisState() - if err != nil { - return xerrors.Errorf("error loading genesis state: %w", err) - } - if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { return xerrors.Errorf("error locking compaction") } @@ -722,7 +674,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { log.Info("warming up hotstore") start := time.Now() - err = s.doWarmup(curTs) + err := s.doWarmup(curTs) if err != nil { log.Errorf("error warming up hotstore: %s", err) return @@ -734,75 +686,13 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { return nil } -func (s *SplitStore) loadGenesisState() error { - // makes sure the genesis and its state root are hot - gb, err := s.chain.GetGenesis() - if err != nil { - return xerrors.Errorf("error getting genesis: %w", err) - } - - genesis := gb.Cid() - genesisStateRoot := gb.ParentStateRoot - - has, err := s.hot.Has(genesis) - if err != nil { - return xerrors.Errorf("error checking hotstore for genesis: %w", err) - } - - if !has { - blk, err := gb.ToStorageBlock() - if err != nil { - return xerrors.Errorf("error converting genesis block to storage block: %w", err) - } - - err = s.hot.Put(blk) - if err != nil { - return xerrors.Errorf("error putting genesis block to hotstore: %w", err) - } - } - - err = s.walkLinks(genesisStateRoot, cid.NewSet(), func(c cid.Cid) error { - has, err = s.hot.Has(c) - if err != nil { - return xerrors.Errorf("error checking hotstore for genesis state root: %w", err) - } - - if !has { - blk, err := s.cold.Get(c) - if err != nil { - if err == bstore.ErrNotFound { - return nil - } - - return xerrors.Errorf("error retrieving genesis state linked object from coldstore: %w", err) - } - - err = s.hot.Put(blk) - if err != nil { - return xerrors.Errorf("error putting genesis state linked object to hotstore: %w", err) - } - } - - return nil - }) - - if err != nil { - return xerrors.Errorf("error walking genesis state root links: %w", err) - } - - return nil -} - func (s *SplitStore) doWarmup(curTs *types.TipSet) error { epoch := curTs.Height() - batchHot := make([]blocks.Block, 0, batchSize) - batchSnoop := make([]cid.Cid, 0, batchSize) - count := int64(0) xcount := int64(0) missing := int64(0) - err := s.walk(curTs, epoch, false, s.cfg.HotHeaders, + err := s.walkChain(curTs, epoch, false, func(cid cid.Cid) error { count++ @@ -827,15 +717,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { xcount++ batchHot = append(batchHot, blk) - batchSnoop = append(batchSnoop, cid) - if len(batchHot) == batchSize { - err = s.tracker.PutBatch(batchSnoop, epoch) - if err != nil { - return err - } - batchSnoop = batchSnoop[:0] - err = s.hot.PutMany(batchHot) if err != nil { return err @@ -851,11 +733,6 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { } if len(batchHot) > 0 { - err = s.tracker.PutBatch(batchSnoop, epoch) - if err != nil { - return err - } - err = s.hot.PutMany(batchHot) if err != nil { return err @@ -885,22 +762,8 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { // Compaction/GC Algorithm func (s *SplitStore) compact(curTs *types.TipSet) { - var err error - if s.markSetSize == 0 { - start := time.Now() - log.Info("estimating mark set size") - err = s.estimateMarkSetSize(curTs) - if err != nil { - log.Errorf("error estimating mark set size: %s; aborting compaction", err) - return - } - log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize) - } else { - log.Infow("current mark set size estimate", "size", s.markSetSize) - } - start := time.Now() - err = s.doCompact(curTs) + err := s.doCompact(curTs) took := time.Since(start).Milliseconds() stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) @@ -909,24 +772,6 @@ func (s *SplitStore) compact(curTs *types.TipSet) { } } -func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error { - epoch := curTs.Height() - - var count int64 - err := s.walk(curTs, epoch, false, s.cfg.HotHeaders, - func(cid cid.Cid) error { - count++ - return nil - }) - - if err != nil { - return err - } - - s.markSetSize = count + count>>2 // overestimate a bit - return nil -} - func (s *SplitStore) doCompact(curTs *types.TipSet) error { currentEpoch := curTs.Height() boundaryEpoch := currentEpoch - CompactionBoundary @@ -941,12 +786,18 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer markSet.Close() //nolint:errcheck defer s.debug.Flush() + // 0. Prepare the transaction + s.txnLk.Lock() + s.txnRefs = make(map[cid.Cid]struct{}) + s.txnActive = true + s.txnLk.Unlock() + // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch) startMark := time.Now() var count int64 - err = s.walk(curTs, boundaryEpoch, true, s.cfg.HotHeaders, + err = s.walkChain(curTs, boundaryEpoch, true, func(c cid.Cid) error { count++ return markSet.Mark(c) @@ -962,8 +813,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", count) - // create the transaction protect filter + // fetch refernces taken during marking and create the transaction protect filter s.txnLk.Lock() + txnRefs := s.txnRefs + s.txnRefs = nil s.txnProtect, err = s.txnEnv.Create("protected", 0) if err != nil { s.txnLk.Unlock() @@ -975,28 +828,56 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer func() { s.txnLk.Lock() _ = s.txnProtect.Close() + s.txnActive = false s.txnProtect = nil s.txnMarkSet = nil s.txnLk.Unlock() }() - // flush pending writes to update the tracker - s.flushPendingWrites(false) + // 1.1 Update markset for references created during marking + log.Info("updating mark set for live references") + startMark = time.Now() + walked := cid.NewSet() + count = 0 + for c := range txnRefs { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset for %s: %w", c, err) + } - // 2. move cold unreachable objects to the coldstore - log.Info("collecting candidate cold objects") + if mark { + continue + } + + err = s.walkObject(c, walked, func(c cid.Cid) error { + mark, err := markSet.Has(c) + if err != nil { + return xerrors.Errorf("error checking markset for %s: %w", c, err) + } + + if mark { + return errStopWalk + } + + count++ + return markSet.Mark(c) + }) + + if err != nil { + return xerrors.Errorf("error walking %s for marking: %w", c, err) + } + } + log.Infow("update marking set done", "took", time.Since(startMark), "marked", count) + + // 2. iterate through the hotstore to collect cold objects + log.Info("collecting cold objects") startCollect := time.Now() - candidates := make(map[cid.Cid]struct{}, s.coldPurgeSize) - var towalk []cid.Cid - // some stats for logging - var hotCnt, coldCnt, slackCnt, liveCnt int + var hotCnt, coldCnt int - // 2.1 iterate through the tracking store and collect unreachable cold objects - // for every hot object that is a dag and not in the markset, walk for links and - // and mark reachable objects - err = s.tracker.ForEach(func(c cid.Cid, writeEpoch abi.ChainEpoch) error { + cold := make([]cid.Cid, 0, s.coldPurgeSize) + err = s.hot.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error { // was it marked? mark, err := markSet.Has(c) if err != nil { @@ -1008,30 +889,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } - // is the object still hot? - if writeEpoch >= boundaryEpoch { - // yes, stay in the hotstore - hotCnt++ - - // if it is a DAG, add it to the walk list to recursively update the markset - if c.Prefix().Codec != cid.DagCBOR { - return nil - } - - towalk = append(towalk, c) - return nil - } - - // is the object in slack region? - if writeEpoch > coldEpoch { - // yes stay in the hotstore, but we wont walk you - slackCnt++ - - return nil - } - // it's cold, mark it as candidate for move - candidates[c] = struct{}{} + cold = append(cold, c) coldCnt++ return nil @@ -1047,55 +906,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit } - // walk hot dags that were not marked and recursively update the mark set - log.Info("updating mark set for hot dags") - startMark = time.Now() - - count = 0 - walked := cid.NewSet() - for _, c := range towalk { - err = s.walkLinks(c, walked, func(c cid.Cid) error { - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) - } - - if mark { - // already marked, don't recurse its links - return errStopWalk - } - - count++ - return markSet.Mark(c) - }) - - if err != nil { - return xerrors.Errorf("error walking %s: %w", c, err) - } - } - - log.Infow("updating mark set done", "took", time.Since(startMark), "marked", count) - - // filter the candidate set for objects newly marked as hot - for c := range candidates { - mark, err := markSet.Has(c) - if err != nil { - return xerrors.Errorf("error checking mark set for %s: %w", c, err) - } - - if mark { - delete(candidates, c) - liveCnt++ - } - } - - // create the cold object list - cold := make([]cid.Cid, 0, len(candidates)) - for c := range candidates { - cold = append(cold, c) - } - - log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "live", liveCnt, "slack", slackCnt) + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) @@ -1110,8 +921,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("compaction aborted") } - // 2.2 copy the cold objects to the coldstore -- if we have one - if !s.cfg.SkipMoveColdBlocks { + // 3. copy the cold objects to the coldstore -- if we have one + if !s.cfg.DiscardColdBlocks { log.Info("moving cold blocks to the coldstore") startMove := time.Now() err = s.moveColdBlocks(cold) @@ -1121,7 +932,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("moving done", "took", time.Since(startMove)) } - // 2.3 purge cold objects from the hotstore + // 4. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() err = s.purge(curTs, cold) @@ -1131,11 +942,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) // we are done; do some housekeeping - err = s.tracker.Sync() - if err != nil { - return xerrors.Errorf("error syncing tracker: %w", err) - } - s.gcHotstore() err = s.setBaseEpoch(coldEpoch) @@ -1151,7 +957,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return nil } -func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, fullChain bool, +func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, f func(cid.Cid) error) error { visited := cid.NewSet() walked := cid.NewSet() @@ -1179,25 +985,20 @@ func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, f return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err) } - // don't walk under the boundary, unless we are walking the full chain - if hdr.Height < boundary && !fullChain { - return nil - } - - // we only scan the block if it is above the boundary + // we only scan the block if it is at or above the boundary if hdr.Height >= boundary { scanCnt++ if inclMsgs { - if err := s.walkLinks(hdr.Messages, walked, f); err != nil { + if err := s.walkObject(hdr.Messages, walked, f); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } - if err := s.walkLinks(hdr.ParentMessageReceipts, walked, f); err != nil { + if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil { return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } } - if err := s.walkLinks(hdr.ParentStateRoot, walked, f); err != nil { + if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) } } @@ -1224,7 +1025,7 @@ func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, f return nil } -func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error { +func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error { if !walked.Visit(c) { return nil } @@ -1253,7 +1054,7 @@ func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error } for _, c := range links { - err := s.walkLinks(c, walked, f) + err := s.walkObject(c, walked, f) if err != nil { return xerrors.Errorf("error walking link (cid: %s): %w", c, err) } @@ -1277,21 +1078,15 @@ func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error { func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { batch := make([]blocks.Block, 0, batchSize) - for _, cid := range cold { - blk, err := s.hot.Get(cid) + for _, c := range cold { + blk, err := s.hot.Get(c) if err != nil { if err == bstore.ErrNotFound { - // this can happen if the node is killed after we have deleted the block from the hotstore - // but before we have deleted it from the tracker; just delete the tracker. - err = s.tracker.Delete(cid) - if err != nil { - return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err) - } - } else { - return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err) + log.Warnf("hotstore missing block %s", c) + continue } - continue + return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err) } batch = append(batch, blk) @@ -1367,18 +1162,12 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error { s.debug.LogMove(curTs, c) } - err := s.tracker.DeleteBatch(deadCids) - if err != nil { - return xerrors.Errorf("error purging tracking: %w", err) - } - - err = s.hot.DeleteMany(deadCids) + err := s.hot.DeleteMany(deadCids) if err != nil { return xerrors.Errorf("error purging cold objects: %w", err) } purgeCnt += len(deadCids) - return nil }) }