use CAS for compacting state
This commit is contained in:
parent
0af7b16ad5
commit
b0f48b500f
@ -5,6 +5,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/bmatsuo/lmdb-go/lmdb"
|
||||
@ -28,10 +29,12 @@ var baseEpochKey = dstore.NewKey("baseEpoch")
|
||||
var log = logging.Logger("splitstore")
|
||||
|
||||
type SplitStore struct {
|
||||
mx sync.Mutex
|
||||
baseEpoch abi.ChainEpoch
|
||||
curTs *types.TipSet
|
||||
compacting bool
|
||||
compacting int32
|
||||
|
||||
baseEpoch abi.ChainEpoch
|
||||
|
||||
mx sync.Mutex
|
||||
curTs *types.TipSet
|
||||
|
||||
cs *store.ChainStore
|
||||
ds dstore.Datastore
|
||||
@ -204,9 +207,9 @@ func (s *SplitStore) Start(cs *store.ChainStore) error {
|
||||
}
|
||||
|
||||
func (s *SplitStore) Close() error {
|
||||
if s.isCompacting() {
|
||||
if atomic.LoadInt32(&s.compacting) == 1 {
|
||||
log.Warn("ongoing compaction; waiting for it to finish...")
|
||||
for s.isCompacting() {
|
||||
for atomic.LoadInt32(&s.compacting) == 1 {
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
@ -220,10 +223,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
epoch := s.curTs.Height()
|
||||
s.mx.Unlock()
|
||||
|
||||
if !s.isCompacting() && epoch-s.baseEpoch > CompactionThreshold {
|
||||
s.setCompacting(true)
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
// we are currently compacting, do nothing and wait for the next head change
|
||||
return nil
|
||||
}
|
||||
|
||||
if epoch-s.baseEpoch > CompactionThreshold {
|
||||
go func() {
|
||||
defer s.setCompacting(false)
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
log.Info("compacting splitstore")
|
||||
start := time.Now()
|
||||
@ -232,23 +239,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
|
||||
log.Infow("compaction done", "took", time.Since(start))
|
||||
}()
|
||||
} else {
|
||||
// no compaction necessary
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) isCompacting() bool {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
return s.compacting
|
||||
}
|
||||
|
||||
func (s *SplitStore) setCompacting(state bool) {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
s.compacting = state
|
||||
}
|
||||
|
||||
// Compaction/GC Algorithm
|
||||
func (s *SplitStore) compact() {
|
||||
// create two on disk live sets, one for marking the cold finality region
|
||||
|
Loading…
Reference in New Issue
Block a user