change highly contented message pool locks to RWMutexes for performance
This commit is contained in:
parent
41fce94db4
commit
6f440a6420
@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
|
||||
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
|
||||
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||
var msgs []*types.Message
|
||||
mp.lk.Lock()
|
||||
mp.lk.RLock()
|
||||
mset, ok := mp.pending[from]
|
||||
if ok {
|
||||
for _, sm := range mset.msgs {
|
||||
msgs = append(msgs, &sm.Message)
|
||||
}
|
||||
}
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
if len(msgs) == 0 {
|
||||
return nil, nil
|
||||
@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
|
||||
msgMap := make(map[address.Address]map[uint64]*types.Message)
|
||||
count := 0
|
||||
|
||||
mp.lk.Lock()
|
||||
mp.lk.RLock()
|
||||
for _, m := range replace {
|
||||
mmap, ok := msgMap[m.From]
|
||||
if !ok {
|
||||
@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
|
||||
}
|
||||
mmap[m.Nonce] = m
|
||||
}
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
msgs := make([]*types.Message, 0, count)
|
||||
start := 0
|
||||
@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
||||
if mp.api.IsLite() {
|
||||
return nil, nil
|
||||
}
|
||||
mp.curTsLk.Lock()
|
||||
mp.curTsLk.RLock()
|
||||
curTs := mp.curTs
|
||||
mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RUnlock()
|
||||
|
||||
epoch := curTs.Height() + 1
|
||||
|
||||
@ -143,7 +143,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
||||
|
||||
st, ok := state[m.From]
|
||||
if !ok {
|
||||
mp.lk.Lock()
|
||||
mp.lk.RLock()
|
||||
mset, ok := mp.pending[m.From]
|
||||
if ok && !interned {
|
||||
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
|
||||
@ -151,14 +151,14 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
||||
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
|
||||
}
|
||||
state[m.From] = st
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
check.OK = true
|
||||
check.Hint = map[string]interface{}{
|
||||
"nonce": st.nextNonce,
|
||||
}
|
||||
} else {
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
|
||||
if err != nil {
|
||||
|
@ -118,7 +118,7 @@ func init() {
|
||||
}
|
||||
|
||||
type MessagePool struct {
|
||||
lk sync.Mutex
|
||||
lk sync.RWMutex
|
||||
|
||||
ds dtypes.MetadataDS
|
||||
|
||||
@ -139,7 +139,7 @@ type MessagePool struct {
|
||||
|
||||
keyCache map[address.Address]address.Address
|
||||
|
||||
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
||||
curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk
|
||||
curTs *types.TipSet
|
||||
|
||||
cfgLk sync.RWMutex
|
||||
@ -1001,19 +1001,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
|
||||
}
|
||||
|
||||
func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RLock()
|
||||
defer mp.curTsLk.RUnlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
mp.lk.RLock()
|
||||
defer mp.lk.RUnlock()
|
||||
|
||||
return mp.getNonceLocked(ctx, addr, mp.curTs)
|
||||
}
|
||||
|
||||
// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
|
||||
func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RLock()
|
||||
defer mp.curTsLk.RUnlock()
|
||||
return mp.api.GetActorAfter(addr, mp.curTs)
|
||||
}
|
||||
|
||||
@ -1164,11 +1164,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RLock()
|
||||
defer mp.curTsLk.RUnlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
mp.lk.RLock()
|
||||
defer mp.lk.RUnlock()
|
||||
|
||||
return mp.allPending(ctx)
|
||||
}
|
||||
@ -1184,11 +1184,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage,
|
||||
}
|
||||
|
||||
func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RLock()
|
||||
defer mp.curTsLk.RUnlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
mp.lk.RLock()
|
||||
defer mp.lk.RUnlock()
|
||||
return mp.pendingFor(ctx, a), mp.curTs
|
||||
}
|
||||
|
||||
@ -1237,9 +1237,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
||||
|
||||
maybeRepub := func(cid cid.Cid) {
|
||||
if !repubTrigger {
|
||||
mp.lk.Lock()
|
||||
mp.lk.RLock()
|
||||
_, republished := mp.republished[cid]
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
if republished {
|
||||
repubTrigger = true
|
||||
}
|
||||
@ -1310,9 +1310,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
||||
}
|
||||
|
||||
if len(revert) > 0 && futureDebug {
|
||||
mp.lk.Lock()
|
||||
mp.lk.RLock()
|
||||
msgs, ts := mp.allPending(ctx)
|
||||
mp.lk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
buckets := map[address.Address]*statBucket{}
|
||||
|
||||
|
@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
//TODO confirm if we can switch to RLock here for performance
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user