From b0f48b500f4bf9450fed11bf0ce34ecd33e1554c Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 29 Nov 2020 15:10:30 +0200 Subject: [PATCH] use CAS for compacting state --- chain/store/splitstore/splitstore.go | 40 +++++++++++++--------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/chain/store/splitstore/splitstore.go b/chain/store/splitstore/splitstore.go index 3adc589ad..3b89bb7c6 100644 --- a/chain/store/splitstore/splitstore.go +++ b/chain/store/splitstore/splitstore.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "sync" + "sync/atomic" "time" "github.com/bmatsuo/lmdb-go/lmdb" @@ -28,10 +29,12 @@ var baseEpochKey = dstore.NewKey("baseEpoch") var log = logging.Logger("splitstore") type SplitStore struct { - mx sync.Mutex - baseEpoch abi.ChainEpoch - curTs *types.TipSet - compacting bool + compacting int32 + + baseEpoch abi.ChainEpoch + + mx sync.Mutex + curTs *types.TipSet cs *store.ChainStore ds dstore.Datastore @@ -204,9 +207,9 @@ func (s *SplitStore) Start(cs *store.ChainStore) error { } func (s *SplitStore) Close() error { - if s.isCompacting() { + if atomic.LoadInt32(&s.compacting) == 1 { log.Warn("ongoing compaction; waiting for it to finish...") - for s.isCompacting() { + for atomic.LoadInt32(&s.compacting) == 1 { time.Sleep(time.Second) } } @@ -220,10 +223,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { epoch := s.curTs.Height() s.mx.Unlock() - if !s.isCompacting() && epoch-s.baseEpoch > CompactionThreshold { - s.setCompacting(true) + if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { + // we are currently compacting, do nothing and wait for the next head change + return nil + } + + if epoch-s.baseEpoch > CompactionThreshold { go func() { - defer s.setCompacting(false) + defer atomic.StoreInt32(&s.compacting, 0) log.Info("compacting splitstore") start := time.Now() @@ -232,23 +239,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { log.Infow("compaction done", "took", time.Since(start)) }() + } else { + // no compaction necessary + atomic.StoreInt32(&s.compacting, 0) } return nil } -func (s *SplitStore) isCompacting() bool { - s.mx.Lock() - defer s.mx.Unlock() - return s.compacting -} - -func (s *SplitStore) setCompacting(state bool) { - s.mx.Lock() - defer s.mx.Unlock() - s.compacting = state -} - // Compaction/GC Algorithm func (s *SplitStore) compact() { // create two on disk live sets, one for marking the cold finality region