From cb36d5b6a461731b9f5e929a40209b1a3269d295 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 1 Mar 2021 18:41:51 +0200 Subject: [PATCH] warm up splitstore at first head change notification --- blockstore/splitstore/splitstore.go | 87 +++++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 56283eb92..6d6e8aba9 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -29,7 +29,10 @@ var ( CompactionCold = build.Finality ) -var baseEpochKey = dstore.NewKey("baseEpoch") +var ( + baseEpochKey = dstore.NewKey("/splitstore/baseEpoch") + warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch") +) var log = logging.Logger("splitstore") @@ -64,7 +67,8 @@ type SplitStore struct { skipOldMsgs bool skipMsgReceipts bool - baseEpoch abi.ChainEpoch + baseEpoch abi.ChainEpoch + warmupEpoch abi.ChainEpoch mx sync.Mutex curTs *types.TipSet @@ -275,11 +279,25 @@ func (s *SplitStore) Start(cs *store.ChainStore) error { err = s.setBaseEpoch(s.curTs.Height()) if err != nil { - return err + return xerrors.Errorf("error saving base epoch: %w", err) } default: - return err + return xerrors.Errorf("error loading base epoch: %w", err) + } + + // load warmup epoch from metadata ds + // if none, then the splitstore will warm up the hotstore at first head change notif + // by walking the current tipset + bs, err = s.ds.Get(warmupEpochKey) + switch err { + case nil: + s.warmupEpoch = bytesToEpoch(bs) + + case dstore.ErrNotFound: + + default: + return xerrors.Errorf("error loading warmup epoch: %w", err) } // watch the chain @@ -310,7 +328,25 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { return nil } + if s.warmupEpoch == 0 { + // splitstore needs to warm up + s.warmupEpoch = epoch + go func() { + defer atomic.StoreInt32(&s.compacting, 0) + + log.Info("warming up hotstore") + start := time.Now() + + s.warmup() + + log.Infow("warm up done", "took", time.Since(start)) + }() + + return nil + } + if epoch-s.baseEpoch > CompactionThreshold { + // it's time to compact go func() { defer atomic.StoreInt32(&s.compacting, 0) @@ -329,6 +365,49 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { return nil } +func (s *SplitStore) warmup() { + s.mx.Lock() + curTs := s.curTs + epoch := curTs.Height() + s.mx.Unlock() + + err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + has, err := s.hot.Has(cid) + if err != nil { + return err + } + + if has { + return nil + } + + blk, err := s.cold.Get(cid) + if err != nil { + return err + } + + err = s.snoop.Put(cid, epoch) + if err != nil { + return err + } + + return s.hot.Put(blk) + }) + + if err != nil { + log.Errorf("error warming up splitstore: %s", err) + return + } + + // save the warmup epoch + s.warmupEpoch = epoch + err = s.ds.Put(warmupEpochKey, epochToBytes(epoch)) + if err != nil { + log.Errorf("error saving warmup epoch: %s", err) + } +} + // Compaction/GC Algorithm func (s *SplitStore) compact() { if s.liveSetSize == 0 {