snake current tipset from head change notification
This commit is contained in:
parent
cb36d5b6a4
commit
748dd962d8
@ -319,8 +319,9 @@ func (s *SplitStore) Close() error {
|
||||
|
||||
func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
s.mx.Lock()
|
||||
s.curTs = apply[len(apply)-1]
|
||||
epoch := s.curTs.Height()
|
||||
curTs := apply[len(apply)-1]
|
||||
epoch := curTs.Height()
|
||||
s.curTs = curTs
|
||||
s.mx.Unlock()
|
||||
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
@ -330,14 +331,13 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
|
||||
if s.warmupEpoch == 0 {
|
||||
// splitstore needs to warm up
|
||||
s.warmupEpoch = epoch
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
log.Info("warming up hotstore")
|
||||
start := time.Now()
|
||||
|
||||
s.warmup()
|
||||
s.warmup(curTs)
|
||||
|
||||
log.Infow("warm up done", "took", time.Since(start))
|
||||
}()
|
||||
@ -353,7 +353,7 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
log.Info("compacting splitstore")
|
||||
start := time.Now()
|
||||
|
||||
s.compact()
|
||||
s.compact(curTs)
|
||||
|
||||
log.Infow("compaction done", "took", time.Since(start))
|
||||
}()
|
||||
@ -365,11 +365,8 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) warmup() {
|
||||
s.mx.Lock()
|
||||
curTs := s.curTs
|
||||
func (s *SplitStore) warmup(curTs *types.TipSet) {
|
||||
epoch := curTs.Height()
|
||||
s.mx.Unlock()
|
||||
|
||||
err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||
func(cid cid.Cid) error {
|
||||
@ -409,28 +406,24 @@ func (s *SplitStore) warmup() {
|
||||
}
|
||||
|
||||
// Compaction/GC Algorithm
|
||||
func (s *SplitStore) compact() {
|
||||
func (s *SplitStore) compact(curTs *types.TipSet) {
|
||||
if s.liveSetSize == 0 {
|
||||
start := time.Now()
|
||||
log.Info("estimating live set size")
|
||||
s.estimateLiveSetSize()
|
||||
s.estimateLiveSetSize(curTs)
|
||||
log.Infow("estimating live set size done", "took", time.Since(start), "size", s.liveSetSize)
|
||||
} else {
|
||||
log.Infow("current live set size estimate", "size", s.liveSetSize)
|
||||
}
|
||||
|
||||
if s.fullCompaction {
|
||||
s.compactFull()
|
||||
s.compactFull(curTs)
|
||||
} else {
|
||||
s.compactSimple()
|
||||
s.compactSimple(curTs)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) estimateLiveSetSize() {
|
||||
s.mx.Lock()
|
||||
curTs := s.curTs
|
||||
s.mx.Unlock()
|
||||
|
||||
func (s *SplitStore) estimateLiveSetSize(curTs *types.TipSet) {
|
||||
s.liveSetSize = 0
|
||||
err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||
func(cid cid.Cid) error {
|
||||
@ -444,11 +437,7 @@ func (s *SplitStore) estimateLiveSetSize() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) compactSimple() {
|
||||
s.mx.Lock()
|
||||
curTs := s.curTs
|
||||
s.mx.Unlock()
|
||||
|
||||
func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
coldEpoch := s.baseEpoch + CompactionCold
|
||||
|
||||
log.Infow("running simple compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
|
||||
@ -622,11 +611,7 @@ func (s *SplitStore) compactSimple() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) compactFull() {
|
||||
s.mx.Lock()
|
||||
curTs := s.curTs
|
||||
s.mx.Unlock()
|
||||
|
||||
func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
epoch := curTs.Height()
|
||||
coldEpoch := s.baseEpoch + CompactionCold
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user