feat: splitstore: access warmupepoch atomically instead of locking
This commit is contained in:
parent
86723a3223
commit
92dcfe530b
@ -164,7 +164,7 @@ type SplitStore struct {
|
|||||||
path string
|
path string
|
||||||
|
|
||||||
mx sync.Mutex
|
mx sync.Mutex
|
||||||
warmupEpoch abi.ChainEpoch // protected by mx
|
warmupEpoch int64 // atomically accessed
|
||||||
baseEpoch abi.ChainEpoch // protected by compaction lock
|
baseEpoch abi.ChainEpoch // protected by compaction lock
|
||||||
pruneEpoch abi.ChainEpoch // protected by compaction lock
|
pruneEpoch abi.ChainEpoch // protected by compaction lock
|
||||||
|
|
||||||
@ -684,9 +684,7 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) isWarm() bool {
|
func (s *SplitStore) isWarm() bool {
|
||||||
s.mx.Lock()
|
return atomic.LoadInt64(&s.warmupEpoch) > 0
|
||||||
defer s.mx.Unlock()
|
|
||||||
return s.warmupEpoch > 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// State tracking
|
// State tracking
|
||||||
@ -757,7 +755,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
|
|||||||
bs, err = s.ds.Get(s.ctx, warmupEpochKey)
|
bs, err = s.ds.Get(s.ctx, warmupEpochKey)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
s.warmupEpoch = bytesToEpoch(bs)
|
atomic.StoreInt64(&s.warmupEpoch, bytesToInt64(bs))
|
||||||
|
|
||||||
case dstore.ErrNotFound:
|
case dstore.ErrNotFound:
|
||||||
warmup = true
|
warmup = true
|
||||||
@ -791,7 +789,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
|
|||||||
return xerrors.Errorf("error loading compaction index: %w", err)
|
return xerrors.Errorf("error loading compaction index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
|
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", atomic.LoadInt64(&s.warmupEpoch))
|
||||||
|
|
||||||
if warmup {
|
if warmup {
|
||||||
err = s.warmup(curTs)
|
err = s.warmup(curTs)
|
||||||
|
@ -145,7 +145,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
|
|||||||
func (s *SplitStore) Info() map[string]interface{} {
|
func (s *SplitStore) Info() map[string]interface{} {
|
||||||
info := make(map[string]interface{})
|
info := make(map[string]interface{})
|
||||||
info["base epoch"] = s.baseEpoch
|
info["base epoch"] = s.baseEpoch
|
||||||
info["warmup epoch"] = s.warmupEpoch
|
info["warmup epoch"] = atomic.LoadInt64(&s.warmupEpoch)
|
||||||
info["compactions"] = s.compactionIndex
|
info["compactions"] = s.compactionIndex
|
||||||
info["prunes"] = s.pruneIndex
|
info["prunes"] = s.pruneIndex
|
||||||
info["compacting"] = s.compacting == 1
|
info["compacting"] = s.compacting == 1
|
||||||
|
@ -137,7 +137,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error saving warm up epoch: %w", err)
|
return xerrors.Errorf("error saving warm up epoch: %w", err)
|
||||||
}
|
}
|
||||||
s.mx.Lock()
|
s.mx.Lock()
|
||||||
s.warmupEpoch = epoch
|
atomic.StoreInt64(&s.warmupEpoch, int64(epoch))
|
||||||
s.mx.Unlock()
|
s.mx.Unlock()
|
||||||
|
|
||||||
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
|
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
|
||||||
|
Loading…
Reference in New Issue
Block a user