lotus/blockstore/splitstore/splitstore_warmup.go

150 lines
3.5 KiB
Go
Raw Normal View History

package splitstore
import (
2022-01-26 07:01:51 +00:00
"sync"
"sync/atomic"
"time"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
2022-06-14 15:00:51 +00:00
"golang.org/x/xerrors"
2021-07-26 15:36:01 +00:00
"github.com/filecoin-project/go-state-types/abi"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
)
var (
// WarmupBoundary is the number of epochs to load state during warmup.
WarmupBoundary = build.Finality
)
// warmup acquires the compaction lock and spawns a goroutine to warm up the hotstore;
// this is necessary when we sync from a snapshot or when we enable the splitstore
// on top of an existing blockstore (which becomes the coldstore).
func (s *SplitStore) warmup(curTs *types.TipSet) error {
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
return xerrors.Errorf("error locking compaction")
}
s.compactType = warmup
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
log.Info("warming up hotstore")
start := time.Now()
err := s.doWarmup(curTs)
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
}
log.Infow("warm up done", "took", time.Since(start))
}()
return nil
}
// the actual warmup procedure; it walks the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
2021-07-26 15:36:01 +00:00
var boundaryEpoch abi.ChainEpoch
epoch := curTs.Height()
2021-07-26 15:36:01 +00:00
if WarmupBoundary < epoch {
boundaryEpoch = epoch - WarmupBoundary
}
2022-01-26 07:01:51 +00:00
var mx sync.Mutex
batchHot := make([]blocks.Block, 0, batchSize)
2022-01-26 07:01:51 +00:00
count := new(int64)
xcount := new(int64)
missing := new(int64)
visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, epoch+1, // we don't load messages/receipts in warmup
visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
2022-01-26 07:01:51 +00:00
atomic.AddInt64(count, 1)
2021-12-11 21:03:00 +00:00
has, err := s.hot.Has(s.ctx, c)
if err != nil {
return err
}
if has {
return nil
}
2021-12-11 21:03:00 +00:00
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
if ipld.IsNotFound(err) {
2022-01-26 07:01:51 +00:00
atomic.AddInt64(missing, 1)
return errStopWalk
}
return err
}
2022-01-26 07:01:51 +00:00
atomic.AddInt64(xcount, 1)
2022-01-26 07:01:51 +00:00
mx.Lock()
batchHot = append(batchHot, blk)
if len(batchHot) == batchSize {
2021-12-11 21:03:00 +00:00
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
2022-01-26 07:01:51 +00:00
mx.Unlock()
return err
}
batchHot = batchHot[:0]
}
2022-01-26 07:01:51 +00:00
mx.Unlock()
return nil
}, func(cid.Cid) error { return nil })
if err != nil {
return err
}
if len(batchHot) > 0 {
2021-12-11 21:03:00 +00:00
err = s.hot.PutMany(s.ctx, batchHot)
if err != nil {
return err
}
}
2022-01-26 07:01:51 +00:00
log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)
2022-01-26 07:01:51 +00:00
s.markSetSize = *count + *count>>2 // overestimate a bit
2021-12-11 21:03:00 +00:00
err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)
}
// save the warmup epoch
2021-12-11 21:03:00 +00:00
err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch))
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}
s.warmupEpoch.Store(int64(epoch))
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
2021-12-11 21:03:00 +00:00
err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}
return nil
}