warm up splitstore at first head change notification
This commit is contained in:
parent
1a804fbdec
commit
cb36d5b6a4
@ -29,7 +29,10 @@ var (
|
||||
CompactionCold = build.Finality
|
||||
)
|
||||
|
||||
var baseEpochKey = dstore.NewKey("baseEpoch")
|
||||
var (
|
||||
baseEpochKey = dstore.NewKey("/splitstore/baseEpoch")
|
||||
warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch")
|
||||
)
|
||||
|
||||
var log = logging.Logger("splitstore")
|
||||
|
||||
@ -65,6 +68,7 @@ type SplitStore struct {
|
||||
skipMsgReceipts bool
|
||||
|
||||
baseEpoch abi.ChainEpoch
|
||||
warmupEpoch abi.ChainEpoch
|
||||
|
||||
mx sync.Mutex
|
||||
curTs *types.TipSet
|
||||
@ -275,11 +279,25 @@ func (s *SplitStore) Start(cs *store.ChainStore) error {
|
||||
|
||||
err = s.setBaseEpoch(s.curTs.Height())
|
||||
if err != nil {
|
||||
return err
|
||||
return xerrors.Errorf("error saving base epoch: %w", err)
|
||||
}
|
||||
|
||||
default:
|
||||
return err
|
||||
return xerrors.Errorf("error loading base epoch: %w", err)
|
||||
}
|
||||
|
||||
// load warmup epoch from metadata ds
|
||||
// if none, then the splitstore will warm up the hotstore at first head change notif
|
||||
// by walking the current tipset
|
||||
bs, err = s.ds.Get(warmupEpochKey)
|
||||
switch err {
|
||||
case nil:
|
||||
s.warmupEpoch = bytesToEpoch(bs)
|
||||
|
||||
case dstore.ErrNotFound:
|
||||
|
||||
default:
|
||||
return xerrors.Errorf("error loading warmup epoch: %w", err)
|
||||
}
|
||||
|
||||
// watch the chain
|
||||
@ -310,7 +328,25 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.warmupEpoch == 0 {
|
||||
// splitstore needs to warm up
|
||||
s.warmupEpoch = epoch
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
log.Info("warming up hotstore")
|
||||
start := time.Now()
|
||||
|
||||
s.warmup()
|
||||
|
||||
log.Infow("warm up done", "took", time.Since(start))
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if epoch-s.baseEpoch > CompactionThreshold {
|
||||
// it's time to compact
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
@ -329,6 +365,49 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) warmup() {
|
||||
s.mx.Lock()
|
||||
curTs := s.curTs
|
||||
epoch := curTs.Height()
|
||||
s.mx.Unlock()
|
||||
|
||||
err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||
func(cid cid.Cid) error {
|
||||
has, err := s.hot.Has(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return nil
|
||||
}
|
||||
|
||||
blk, err := s.cold.Get(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.snoop.Put(cid, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.hot.Put(blk)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error warming up splitstore: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// save the warmup epoch
|
||||
s.warmupEpoch = epoch
|
||||
err = s.ds.Put(warmupEpochKey, epochToBytes(epoch))
|
||||
if err != nil {
|
||||
log.Errorf("error saving warmup epoch: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Compaction/GC Algorithm
|
||||
func (s *SplitStore) compact() {
|
||||
if s.liveSetSize == 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user