refactor warmup to trigger at startup and not wait for sync
This commit is contained in:
parent
3fe4261f12
commit
9b6448518c
@ -122,7 +122,6 @@ type SplitStore struct {
|
||||
baseEpoch abi.ChainEpoch
|
||||
syncGapEpoch abi.ChainEpoch
|
||||
warmupEpoch abi.ChainEpoch
|
||||
warm bool
|
||||
|
||||
coldPurgeSize int
|
||||
|
||||
@ -364,13 +363,12 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
||||
switch err {
|
||||
case nil:
|
||||
s.warmupEpoch = bytesToEpoch(bs)
|
||||
s.warm = true
|
||||
|
||||
case dstore.ErrNotFound:
|
||||
// the hotstore hasn't warmed up, load the genesis into the hotstore
|
||||
err = s.loadGenesisState()
|
||||
err = s.warmup(s.curTs)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error loading genesis state: %w", err)
|
||||
return xerrors.Errorf("error warming up: %w", err)
|
||||
}
|
||||
|
||||
default:
|
||||
@ -449,32 +447,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !s.warm {
|
||||
// splitstore needs to warm up
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
log.Info("warming up hotstore")
|
||||
start := time.Now()
|
||||
|
||||
baseTs, err := s.chain.GetTipsetByHeight(context.Background(), s.baseEpoch, curTs, true)
|
||||
if err != nil {
|
||||
log.Errorf("error warming up hotstore: error getting tipset at base epoch: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = s.warmup(baseTs)
|
||||
if err != nil {
|
||||
log.Errorf("error warming up hotstore: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infow("warm up done", "took", time.Since(start))
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if epoch-s.baseEpoch > CompactionThreshold {
|
||||
// it's time to compact
|
||||
go func() {
|
||||
@ -495,6 +467,41 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||
err := s.loadGenesisState()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error loading genesis state: %w")
|
||||
}
|
||||
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
return xerrors.Errorf("error locking compaction")
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
log.Info("warming up hotstore")
|
||||
start := time.Now()
|
||||
|
||||
err = s.doWarmup(curTs)
|
||||
if err != nil {
|
||||
log.Errorf("error warming up hotstore: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infow("warm up done", "took", time.Since(start))
|
||||
}()
|
||||
|
||||
// save the warmup epoch
|
||||
s.warmupEpoch = curTs.Height()
|
||||
err = s.ds.Put(warmupEpochKey, epochToBytes(s.warmupEpoch))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error saving warm up epoch: %w")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) loadGenesisState() error {
|
||||
// makes sure the genesis and its state root are hot
|
||||
gb, err := s.chain.GetGenesis()
|
||||
@ -554,7 +561,7 @@ func (s *SplitStore) loadGenesisState() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
||||
epoch := curTs.Height()
|
||||
|
||||
batchHot := make([]blocks.Block, 0, batchSize)
|
||||
@ -629,14 +636,6 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||
s.markSetSize = count + count>>2 // overestimate a bit
|
||||
}
|
||||
|
||||
// save the warmup epoch
|
||||
s.warm = true
|
||||
s.warmupEpoch = epoch
|
||||
err = s.ds.Put(warmupEpochKey, epochToBytes(epoch))
|
||||
if err != nil {
|
||||
log.Warnf("error saving warmup epoch: %s", err)
|
||||
}
|
||||
|
||||
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
|
||||
if err != nil {
|
||||
log.Warnf("error saving mark set size: %s", err)
|
||||
|
Loading…
Reference in New Issue
Block a user