wait for sync in a non racey way
This commit is contained in:
parent
578b5691bc
commit
7b4ab2077b
@ -157,6 +157,9 @@ type SplitStore struct {
|
|||||||
txnRefs map[cid.Cid]struct{}
|
txnRefs map[cid.Cid]struct{}
|
||||||
txnMissing map[cid.Cid]struct{}
|
txnMissing map[cid.Cid]struct{}
|
||||||
txnMarkSet MarkSet
|
txnMarkSet MarkSet
|
||||||
|
txnSyncMx sync.Mutex
|
||||||
|
txnSyncCond sync.Cond
|
||||||
|
txnSync bool
|
||||||
|
|
||||||
// registered protectors
|
// registered protectors
|
||||||
protectors []func(func(cid.Cid) error) error
|
protectors []func(func(cid.Cid) error) error
|
||||||
@ -196,6 +199,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
|||||||
}
|
}
|
||||||
|
|
||||||
ss.txnViewsCond.L = &ss.txnViewsMx
|
ss.txnViewsCond.L = &ss.txnViewsMx
|
||||||
|
ss.txnSyncCond.L = &ss.txnSyncMx
|
||||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
if enableDebugLog {
|
if enableDebugLog {
|
||||||
|
@ -162,8 +162,15 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
|
|||||||
// critical section
|
// critical section
|
||||||
if s.txnMarkSet != nil {
|
if s.txnMarkSet != nil {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
s.txnSyncMx.Lock()
|
||||||
|
defer s.txnSyncMx.Unlock()
|
||||||
|
s.txnSync = true
|
||||||
|
s.txnSyncCond.Broadcast()
|
||||||
|
}()
|
||||||
defer s.txnLk.RUnlock()
|
defer s.txnLk.RUnlock()
|
||||||
s.markLiveRefs(cids)
|
s.markLiveRefs(cids)
|
||||||
|
|
||||||
}()
|
}()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -663,9 +670,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the head to catch up so that all messages in the current head are protected
|
// wait for the head to catch up so that all messages are protected
|
||||||
log.Infof("waiting %s for sync", SyncWaitTime)
|
s.waitForSync()
|
||||||
time.Sleep(SyncWaitTime)
|
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -731,6 +737,7 @@ func (s *SplitStore) beginTxnProtect() {
|
|||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
|
|
||||||
s.txnActive = true
|
s.txnActive = true
|
||||||
|
s.txnSync = false
|
||||||
s.txnRefs = make(map[cid.Cid]struct{})
|
s.txnRefs = make(map[cid.Cid]struct{})
|
||||||
s.txnMissing = make(map[cid.Cid]struct{})
|
s.txnMissing = make(map[cid.Cid]struct{})
|
||||||
}
|
}
|
||||||
@ -762,6 +769,15 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) waitForSync() {
|
||||||
|
s.txnSyncMx.Lock()
|
||||||
|
defer s.txnSyncMx.Unlock()
|
||||||
|
|
||||||
|
for !s.txnSync {
|
||||||
|
s.txnSyncCond.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) endTxnProtect() {
|
func (s *SplitStore) endTxnProtect() {
|
||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
@ -771,6 +787,7 @@ func (s *SplitStore) endTxnProtect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.txnActive = false
|
s.txnActive = false
|
||||||
|
s.txnSync = false
|
||||||
s.txnRefs = nil
|
s.txnRefs = nil
|
||||||
s.txnMissing = nil
|
s.txnMissing = nil
|
||||||
s.txnMarkSet = nil
|
s.txnMarkSet = nil
|
||||||
|
Loading…
Reference in New Issue
Block a user