use a missing compactionIndex as an indicator for warmup
so that splitstore v0 nodes upgrading will get a fresh warmup.
This commit is contained in:
parent
669b47cfc9
commit
ff093fae00
@ -485,6 +485,9 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
s.chain = chain
|
s.chain = chain
|
||||||
curTs := chain.GetHeaviestTipSet()
|
curTs := chain.GetHeaviestTipSet()
|
||||||
|
|
||||||
|
// should we warmup
|
||||||
|
warmup := false
|
||||||
|
|
||||||
// load base epoch from metadata ds
|
// load base epoch from metadata ds
|
||||||
// if none, then use current epoch because it's a fresh start
|
// if none, then use current epoch because it's a fresh start
|
||||||
bs, err := s.ds.Get(baseEpochKey)
|
bs, err := s.ds.Get(baseEpochKey)
|
||||||
@ -514,11 +517,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
s.warmupEpoch = bytesToEpoch(bs)
|
s.warmupEpoch = bytesToEpoch(bs)
|
||||||
|
|
||||||
case dstore.ErrNotFound:
|
case dstore.ErrNotFound:
|
||||||
// the hotstore hasn't warmed up, start a concurrent warm up
|
warmup = true
|
||||||
err = s.warmup(curTs)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("error warming up: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("error loading warmup epoch: %w", err)
|
return xerrors.Errorf("error loading warmup epoch: %w", err)
|
||||||
@ -542,12 +541,22 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
|
|||||||
s.compactionIndex = bytesToInt64(bs)
|
s.compactionIndex = bytesToInt64(bs)
|
||||||
|
|
||||||
case dstore.ErrNotFound:
|
case dstore.ErrNotFound:
|
||||||
|
// this is potentially an upgrade from splitstore v0; schedule a warmup as v0 has
|
||||||
|
// some issues with hot references leaking into the coldstore.
|
||||||
|
warmup = true
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("error loading compaction index: %w", err)
|
return xerrors.Errorf("error loading compaction index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
|
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
|
||||||
|
|
||||||
|
if warmup {
|
||||||
|
err = s.warmup(curTs)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error starting warmup: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// watch the chain
|
// watch the chain
|
||||||
chain.SubscribeHeadChanges(s.HeadChange)
|
chain.SubscribeHeadChanges(s.HeadChange)
|
||||||
|
|
||||||
@ -959,6 +968,12 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
|||||||
s.warmupEpoch = epoch
|
s.warmupEpoch = epoch
|
||||||
s.mx.Unlock()
|
s.mx.Unlock()
|
||||||
|
|
||||||
|
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
|
||||||
|
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error saving compaction index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user