RIP tracking store

This commit is contained in:
vyzo 2021-07-04 09:53:58 +03:00
parent d476a3db2c
commit 1f2b604c07
2 changed files with 158 additions and 368 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)
@ -102,7 +103,7 @@ func (d *debugLog) LogReadMiss(curTs *types.TipSet, cid cid.Cid) {
}
}
func (d *debugLog) LogWrite(curTs *types.TipSet, c cid.Cid, writeEpoch abi.ChainEpoch) {
func (d *debugLog) LogWrite(curTs *types.TipSet, blk blocks.Block, writeEpoch abi.ChainEpoch) {
if d == nil {
return
}
@ -122,13 +123,13 @@ func (d *debugLog) LogWrite(curTs *types.TipSet, c cid.Cid, writeEpoch abi.Chain
d.writeCnt++
_, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", d.timestamp(), curEpoch, c, writeEpoch, stack)
_, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", d.timestamp(), curEpoch, blk.Cid(), writeEpoch, stack)
if err != nil {
log.Warnf("error writing write log: %s", err)
}
}
func (d *debugLog) LogWriteMany(curTs *types.TipSet, cids []cid.Cid, writeEpoch abi.ChainEpoch) {
func (d *debugLog) LogWriteMany(curTs *types.TipSet, blks []blocks.Block, writeEpoch abi.ChainEpoch) {
if d == nil {
return
}
@ -146,11 +147,11 @@ func (d *debugLog) LogWriteMany(curTs *types.TipSet, cids []cid.Cid, writeEpoch
d.writeMx.Lock()
defer d.writeMx.Unlock()
d.writeCnt += len(cids)
d.writeCnt += len(blks)
now := d.timestamp()
for _, c := range cids {
_, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", now, curEpoch, c, writeEpoch, stack)
for _, blk := range blks {
_, err := fmt.Fprintf(d.writeLog, "%s %d %s %d%s\n", now, curEpoch, blk.Cid(), writeEpoch, stack)
if err != nil {
log.Warnf("error writing write log: %s", err)
break

View File

@ -103,16 +103,11 @@ type Config struct {
// Supported values are: "bloom" (default if omitted), "bolt".
MarkSetType string
// HotHeaders indicates whether to keep chain block headers in hotstore or not.
// This is necessary, and automatically set by DI in lotus node construction, if
// you are running with a noop coldstore.
HotHeaders bool
// SkipMoveColdBlocks indicates whether to skip moving cold blocks to the coldstore.
// If the splitstore is running with a noop coldstore then this option is set to true
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
// and directly purges cold blocks.
SkipMoveColdBlocks bool
DiscardColdBlocks bool
}
// ChainAccessor allows the Splitstore to access the chain. It will most likely
@ -140,11 +135,10 @@ type SplitStore struct {
mx sync.Mutex
curTs *types.TipSet
chain ChainAccessor
ds dstore.Datastore
hot bstore.Blockstore
cold bstore.Blockstore
tracker TrackingStore
chain ChainAccessor
ds dstore.Datastore
hot bstore.Blockstore
cold bstore.Blockstore
markSetEnv MarkSetEnv
markSetSize int64
@ -159,9 +153,8 @@ type SplitStore struct {
txnEnv MarkSetEnv
txnProtect MarkSet
txnMarkSet MarkSet
// pending write set
pendingWrites map[cid.Cid]struct{}
txnRefs map[cid.Cid]struct{}
txnActive bool
}
var _ bstore.Blockstore = (*SplitStore)(nil)
@ -170,23 +163,20 @@ var _ bstore.Blockstore = (*SplitStore)(nil)
// is backed by the provided hot and cold stores. The returned SplitStore MUST be
// attached to the ChainStore with Start in order to trigger compaction.
func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) {
// the tracking store
tracker, err := OpenTrackingStore(path, cfg.TrackingStoreType)
if err != nil {
return nil, err
// hot blockstore must support BlockstoreIterator
if _, ok := hot.(bstore.BlockstoreIterator); !ok {
return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot)
}
// the markset env
markSetEnv, err := OpenMarkSetEnv(path, "bolt")
markSetEnv, err := OpenMarkSetEnv(path, "mapts")
if err != nil {
_ = tracker.Close()
return nil, err
}
// the txn markset env
txnEnv, err := OpenMarkSetEnv(path, "mapts")
if err != nil {
_ = tracker.Close()
_ = markSetEnv.Close()
return nil, err
}
@ -197,13 +187,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ds: ds,
hot: hot,
cold: cold,
tracker: tracker,
markSetEnv: markSetEnv,
txnEnv: txnEnv,
coldPurgeSize: defaultColdPurgeSize,
pendingWrites: make(map[cid.Cid]struct{}),
}
ss.ctx, ss.cancel = context.WithCancel(context.Background())
@ -244,13 +231,6 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
// -- the vm uses this check to avoid duplicate writes on Copy.
// When we have options in the API (or something better), the vm can explicitly signal
// that this is an implicit Write.
s.trackWrite(c)
// also make sure the object is considered live during compaction in case we have already
// flushed pending writes and started compaction.
// when within vm copy context, dags will be recursively referenced.
// in case of a race with purge, this will return a track error, which we can use to
// signal to the vm that the object is not fully present.
err = s.trackTxnRef(c, true)
if xerrors.Is(err, errMissingObject) {
// we failed to recursively protect the object because some inner object has been purged;
@ -276,12 +256,14 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
return blk, err
case bstore.ErrNotFound:
s.mx.Lock()
warmup := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warmup {
s.debug.LogReadMiss(curTs, cid)
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(curTs, cid)
}
}
blk, err = s.cold.Get(cid)
@ -308,12 +290,14 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
return size, err
case bstore.ErrNotFound:
s.mx.Lock()
warmup := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warmup {
s.debug.LogReadMiss(curTs, cid)
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(curTs, cid)
}
}
size, err = s.cold.GetSize(cid)
@ -333,7 +317,13 @@ func (s *SplitStore) Put(blk blocks.Block) error {
err := s.hot.Put(blk)
if err == nil {
s.trackWrite(blk.Cid())
if s.debug != nil {
s.mx.Lock()
curTs := s.curTs
writeEpoch := s.writeEpoch
s.mx.Unlock()
s.debug.LogWrite(curTs, blk, writeEpoch)
}
err = s.trackTxnRef(blk.Cid(), false)
}
@ -351,7 +341,14 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
err := s.hot.PutMany(blks)
if err == nil {
s.trackWriteMany(batch)
if s.debug != nil {
s.mx.Lock()
curTs := s.curTs
writeEpoch := s.writeEpoch
s.mx.Unlock()
s.debug.LogWriteMany(curTs, blks, writeEpoch)
}
err = s.trackTxnRefMany(batch)
}
@ -408,12 +405,14 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
return err
case bstore.ErrNotFound:
s.mx.Lock()
warmup := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warmup {
s.debug.LogReadMiss(curTs, cid)
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
curTs := s.curTs
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(curTs, cid)
}
}
err = s.cold.View(cid, cb)
@ -485,11 +484,11 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return xerrors.Errorf("error loading mark set size: %w", err)
}
s.updateWriteEpoch()
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch, "writeEpoch", s.writeEpoch)
go s.background()
if s.debug != nil {
go s.background()
}
// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)
@ -507,9 +506,8 @@ func (s *SplitStore) Close() error {
}
}
s.flushPendingWrites(false)
s.cancel()
return multierr.Combine(s.tracker.Close(), s.markSetEnv.Close(), s.debug.Close())
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
}
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
@ -524,8 +522,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
s.curTs = curTs
s.mx.Unlock()
s.updateWriteEpoch()
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if time.Since(timestamp) > SyncGapTime {
// don't attempt compaction before we have caught up syncing
@ -557,6 +553,21 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil
}
func (s *SplitStore) background() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.updateWriteEpoch()
}
}
}
func (s *SplitStore) updateWriteEpoch() {
s.mx.Lock()
defer s.mx.Unlock()
@ -568,7 +579,6 @@ func (s *SplitStore) updateWriteEpoch() {
if dt < 0 {
writeEpoch := curTs.Height() + 1
if writeEpoch > s.writeEpoch {
s.flushPendingWrites(true)
s.writeEpoch = writeEpoch
}
@ -577,67 +587,29 @@ func (s *SplitStore) updateWriteEpoch() {
writeEpoch := curTs.Height() + abi.ChainEpoch(dt.Seconds())/builtin.EpochDurationSeconds + 1
if writeEpoch > s.writeEpoch {
s.flushPendingWrites(true)
s.writeEpoch = writeEpoch
}
}
// Unfortunately we can't just directly tracker.Put one by one, as it is ridiculously slow with
// bbolt because of syncing (order of 10ms), so we batch them.
func (s *SplitStore) trackWrite(c cid.Cid) {
s.mx.Lock()
defer s.mx.Unlock()
s.pendingWrites[c] = struct{}{}
}
// and also combine batch writes into them
func (s *SplitStore) trackWriteMany(cids []cid.Cid) {
s.mx.Lock()
defer s.mx.Unlock()
for _, c := range cids {
s.pendingWrites[c] = struct{}{}
}
}
func (s *SplitStore) flushPendingWrites(locked bool) {
if !locked {
s.mx.Lock()
defer s.mx.Unlock()
}
if len(s.pendingWrites) == 0 {
return
}
cids := make([]cid.Cid, 0, len(s.pendingWrites))
for c := range s.pendingWrites {
cids = append(cids, c)
}
s.pendingWrites = make(map[cid.Cid]struct{})
epoch := s.writeEpoch
err := s.tracker.PutBatch(cids, epoch)
if err != nil {
log.Errorf("error putting write batch to tracker: %s", err)
}
s.debug.LogWriteMany(s.curTs, cids, epoch)
}
func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
if s.txnProtect == nil {
if !s.txnActive {
// not compacting
return nil
}
if s.txnRefs != nil {
// we haven't finished marking yet, so track the reference
s.txnRefs[c] = struct{}{}
return nil
}
// we have finished marking, protect the reference
if !recursive {
return s.txnProtect.Mark(c)
}
// it's a recursive reference in vm context, protect links if they are not in the markset already
return s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
return s.walkObject(c, cid.NewSet(), func(c cid.Cid) error {
mark, err := s.txnMarkSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
@ -675,7 +647,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, recursive bool) error {
}
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
if s.txnProtect == nil {
if !s.txnActive {
// not compacting
return nil
}
@ -691,27 +663,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
return err
}
func (s *SplitStore) background() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.updateWriteEpoch()
}
}
}
func (s *SplitStore) warmup(curTs *types.TipSet) error {
err := s.loadGenesisState()
if err != nil {
return xerrors.Errorf("error loading genesis state: %w", err)
}
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
return xerrors.Errorf("error locking compaction")
}
@ -722,7 +674,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
log.Info("warming up hotstore")
start := time.Now()
err = s.doWarmup(curTs)
err := s.doWarmup(curTs)
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
@ -734,75 +686,13 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
return nil
}
func (s *SplitStore) loadGenesisState() error {
// makes sure the genesis and its state root are hot
gb, err := s.chain.GetGenesis()
if err != nil {
return xerrors.Errorf("error getting genesis: %w", err)
}
genesis := gb.Cid()
genesisStateRoot := gb.ParentStateRoot
has, err := s.hot.Has(genesis)
if err != nil {
return xerrors.Errorf("error checking hotstore for genesis: %w", err)
}
if !has {
blk, err := gb.ToStorageBlock()
if err != nil {
return xerrors.Errorf("error converting genesis block to storage block: %w", err)
}
err = s.hot.Put(blk)
if err != nil {
return xerrors.Errorf("error putting genesis block to hotstore: %w", err)
}
}
err = s.walkLinks(genesisStateRoot, cid.NewSet(), func(c cid.Cid) error {
has, err = s.hot.Has(c)
if err != nil {
return xerrors.Errorf("error checking hotstore for genesis state root: %w", err)
}
if !has {
blk, err := s.cold.Get(c)
if err != nil {
if err == bstore.ErrNotFound {
return nil
}
return xerrors.Errorf("error retrieving genesis state linked object from coldstore: %w", err)
}
err = s.hot.Put(blk)
if err != nil {
return xerrors.Errorf("error putting genesis state linked object to hotstore: %w", err)
}
}
return nil
})
if err != nil {
return xerrors.Errorf("error walking genesis state root links: %w", err)
}
return nil
}
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
epoch := curTs.Height()
batchHot := make([]blocks.Block, 0, batchSize)
batchSnoop := make([]cid.Cid, 0, batchSize)
count := int64(0)
xcount := int64(0)
missing := int64(0)
err := s.walk(curTs, epoch, false, s.cfg.HotHeaders,
err := s.walkChain(curTs, epoch, false,
func(cid cid.Cid) error {
count++
@ -827,15 +717,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
xcount++
batchHot = append(batchHot, blk)
batchSnoop = append(batchSnoop, cid)
if len(batchHot) == batchSize {
err = s.tracker.PutBatch(batchSnoop, epoch)
if err != nil {
return err
}
batchSnoop = batchSnoop[:0]
err = s.hot.PutMany(batchHot)
if err != nil {
return err
@ -851,11 +733,6 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
}
if len(batchHot) > 0 {
err = s.tracker.PutBatch(batchSnoop, epoch)
if err != nil {
return err
}
err = s.hot.PutMany(batchHot)
if err != nil {
return err
@ -885,22 +762,8 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
// Compaction/GC Algorithm
func (s *SplitStore) compact(curTs *types.TipSet) {
var err error
if s.markSetSize == 0 {
start := time.Now()
log.Info("estimating mark set size")
err = s.estimateMarkSetSize(curTs)
if err != nil {
log.Errorf("error estimating mark set size: %s; aborting compaction", err)
return
}
log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize)
} else {
log.Infow("current mark set size estimate", "size", s.markSetSize)
}
start := time.Now()
err = s.doCompact(curTs)
err := s.doCompact(curTs)
took := time.Since(start).Milliseconds()
stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
@ -909,24 +772,6 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
}
}
func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error {
epoch := curTs.Height()
var count int64
err := s.walk(curTs, epoch, false, s.cfg.HotHeaders,
func(cid cid.Cid) error {
count++
return nil
})
if err != nil {
return err
}
s.markSetSize = count + count>>2 // overestimate a bit
return nil
}
func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary
@ -941,12 +786,18 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer markSet.Close() //nolint:errcheck
defer s.debug.Flush()
// 0. Prepare the transaction
s.txnLk.Lock()
s.txnRefs = make(map[cid.Cid]struct{})
s.txnActive = true
s.txnLk.Unlock()
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)
startMark := time.Now()
var count int64
err = s.walk(curTs, boundaryEpoch, true, s.cfg.HotHeaders,
err = s.walkChain(curTs, boundaryEpoch, true,
func(c cid.Cid) error {
count++
return markSet.Mark(c)
@ -962,8 +813,10 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
// create the transaction protect filter
// fetch refernces taken during marking and create the transaction protect filter
s.txnLk.Lock()
txnRefs := s.txnRefs
s.txnRefs = nil
s.txnProtect, err = s.txnEnv.Create("protected", 0)
if err != nil {
s.txnLk.Unlock()
@ -975,28 +828,56 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer func() {
s.txnLk.Lock()
_ = s.txnProtect.Close()
s.txnActive = false
s.txnProtect = nil
s.txnMarkSet = nil
s.txnLk.Unlock()
}()
// flush pending writes to update the tracker
s.flushPendingWrites(false)
// 1.1 Update markset for references created during marking
log.Info("updating mark set for live references")
startMark = time.Now()
walked := cid.NewSet()
count = 0
for c := range txnRefs {
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
// 2. move cold unreachable objects to the coldstore
log.Info("collecting candidate cold objects")
if mark {
continue
}
err = s.walkObject(c, walked, func(c cid.Cid) error {
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking markset for %s: %w", c, err)
}
if mark {
return errStopWalk
}
count++
return markSet.Mark(c)
})
if err != nil {
return xerrors.Errorf("error walking %s for marking: %w", c, err)
}
}
log.Infow("update marking set done", "took", time.Since(startMark), "marked", count)
// 2. iterate through the hotstore to collect cold objects
log.Info("collecting cold objects")
startCollect := time.Now()
candidates := make(map[cid.Cid]struct{}, s.coldPurgeSize)
var towalk []cid.Cid
// some stats for logging
var hotCnt, coldCnt, slackCnt, liveCnt int
var hotCnt, coldCnt int
// 2.1 iterate through the tracking store and collect unreachable cold objects
// for every hot object that is a dag and not in the markset, walk for links and
// and mark reachable objects
err = s.tracker.ForEach(func(c cid.Cid, writeEpoch abi.ChainEpoch) error {
cold := make([]cid.Cid, 0, s.coldPurgeSize)
err = s.hot.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
if err != nil {
@ -1008,30 +889,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
}
// is the object still hot?
if writeEpoch >= boundaryEpoch {
// yes, stay in the hotstore
hotCnt++
// if it is a DAG, add it to the walk list to recursively update the markset
if c.Prefix().Codec != cid.DagCBOR {
return nil
}
towalk = append(towalk, c)
return nil
}
// is the object in slack region?
if writeEpoch > coldEpoch {
// yes stay in the hotstore, but we wont walk you
slackCnt++
return nil
}
// it's cold, mark it as candidate for move
candidates[c] = struct{}{}
cold = append(cold, c)
coldCnt++
return nil
@ -1047,55 +906,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
}
// walk hot dags that were not marked and recursively update the mark set
log.Info("updating mark set for hot dags")
startMark = time.Now()
count = 0
walked := cid.NewSet()
for _, c := range towalk {
err = s.walkLinks(c, walked, func(c cid.Cid) error {
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
}
if mark {
// already marked, don't recurse its links
return errStopWalk
}
count++
return markSet.Mark(c)
})
if err != nil {
return xerrors.Errorf("error walking %s: %w", c, err)
}
}
log.Infow("updating mark set done", "took", time.Since(startMark), "marked", count)
// filter the candidate set for objects newly marked as hot
for c := range candidates {
mark, err := markSet.Has(c)
if err != nil {
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
}
if mark {
delete(candidates, c)
liveCnt++
}
}
// create the cold object list
cold := make([]cid.Cid, 0, len(candidates))
for c := range candidates {
cold = append(cold, c)
}
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "live", liveCnt, "slack", slackCnt)
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
@ -1110,8 +921,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("compaction aborted")
}
// 2.2 copy the cold objects to the coldstore -- if we have one
if !s.cfg.SkipMoveColdBlocks {
// 3. copy the cold objects to the coldstore -- if we have one
if !s.cfg.DiscardColdBlocks {
log.Info("moving cold blocks to the coldstore")
startMove := time.Now()
err = s.moveColdBlocks(cold)
@ -1121,7 +932,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("moving done", "took", time.Since(startMove))
}
// 2.3 purge cold objects from the hotstore
// 4. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.purge(curTs, cold)
@ -1131,11 +942,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
// we are done; do some housekeeping
err = s.tracker.Sync()
if err != nil {
return xerrors.Errorf("error syncing tracker: %w", err)
}
s.gcHotstore()
err = s.setBaseEpoch(coldEpoch)
@ -1151,7 +957,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
}
func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, fullChain bool,
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
f func(cid.Cid) error) error {
visited := cid.NewSet()
walked := cid.NewSet()
@ -1179,25 +985,20 @@ func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, f
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
}
// don't walk under the boundary, unless we are walking the full chain
if hdr.Height < boundary && !fullChain {
return nil
}
// we only scan the block if it is above the boundary
// we only scan the block if it is at or above the boundary
if hdr.Height >= boundary {
scanCnt++
if inclMsgs {
if err := s.walkLinks(hdr.Messages, walked, f); err != nil {
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkLinks(hdr.ParentMessageReceipts, walked, f); err != nil {
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
}
if err := s.walkLinks(hdr.ParentStateRoot, walked, f); err != nil {
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
}
@ -1224,7 +1025,7 @@ func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs, f
return nil
}
func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
if !walked.Visit(c) {
return nil
}
@ -1253,7 +1054,7 @@ func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error
}
for _, c := range links {
err := s.walkLinks(c, walked, f)
err := s.walkObject(c, walked, f)
if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
}
@ -1277,21 +1078,15 @@ func (s *SplitStore) view(cid cid.Cid, cb func([]byte) error) error {
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
batch := make([]blocks.Block, 0, batchSize)
for _, cid := range cold {
blk, err := s.hot.Get(cid)
for _, c := range cold {
blk, err := s.hot.Get(c)
if err != nil {
if err == bstore.ErrNotFound {
// this can happen if the node is killed after we have deleted the block from the hotstore
// but before we have deleted it from the tracker; just delete the tracker.
err = s.tracker.Delete(cid)
if err != nil {
return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err)
}
} else {
return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err)
log.Warnf("hotstore missing block %s", c)
continue
}
continue
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
}
batch = append(batch, blk)
@ -1367,18 +1162,12 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
s.debug.LogMove(curTs, c)
}
err := s.tracker.DeleteBatch(deadCids)
if err != nil {
return xerrors.Errorf("error purging tracking: %w", err)
}
err = s.hot.DeleteMany(deadCids)
err := s.hot.DeleteMany(deadCids)
if err != nil {
return xerrors.Errorf("error purging cold objects: %w", err)
}
purgeCnt += len(deadCids)
return nil
})
}