From 6f440a6420b067f698db283bb4159fa6d292580b Mon Sep 17 00:00:00 2001 From: Mike Seiler Date: Fri, 24 Mar 2023 15:49:02 -1000 Subject: [PATCH] change highly contented message pool locks to RWMutexes for performance --- chain/messagepool/check.go | 18 +++++++------- chain/messagepool/messagepool.go | 40 ++++++++++++++++---------------- chain/messagepool/selection.go | 1 + 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/chain/messagepool/check.go b/chain/messagepool/check.go index b1e2a2778..a1097e7d1 100644 --- a/chain/messagepool/check.go +++ b/chain/messagepool/check.go @@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP // CheckPendingMessages performs a set of logical sets for all messages pending from a given actor func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) { var msgs []*types.Message - mp.lk.Lock() + mp.lk.RLock() mset, ok := mp.pending[from] if ok { for _, sm := range mset.msgs { msgs = append(msgs, &sm.Message) } } - mp.lk.Unlock() + mp.lk.RUnlock() if len(msgs) == 0 { return nil, nil @@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type msgMap := make(map[address.Address]map[uint64]*types.Message) count := 0 - mp.lk.Lock() + mp.lk.RLock() for _, m := range replace { mmap, ok := msgMap[m.From] if !ok { @@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type } mmap[m.Nonce] = m } - mp.lk.Unlock() + mp.lk.RUnlock() msgs := make([]*types.Message, 0, count) start := 0 @@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, if mp.api.IsLite() { return nil, nil } - mp.curTsLk.Lock() + mp.curTsLk.RLock() curTs := mp.curTs - mp.curTsLk.Unlock() + mp.curTsLk.RUnlock() epoch := curTs.Height() + 1 @@ -143,7 +143,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st, ok := state[m.From] if !ok { - mp.lk.Lock() + mp.lk.RLock() mset, ok := mp.pending[m.From] if ok && !interned { st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds} @@ -151,14 +151,14 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int) } state[m.From] = st - mp.lk.Unlock() + mp.lk.RUnlock() check.OK = true check.Hint = map[string]interface{}{ "nonce": st.nextNonce, } } else { - mp.lk.Unlock() + mp.lk.RUnlock() stateNonce, err := mp.getStateNonce(ctx, m.From, curTs) if err != nil { diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0d787bd50..a7b2106bb 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -118,7 +118,7 @@ func init() { } type MessagePool struct { - lk sync.Mutex + lk sync.RWMutex ds dtypes.MetadataDS @@ -139,7 +139,7 @@ type MessagePool struct { keyCache map[address.Address]address.Address - curTsLk sync.Mutex // DO NOT LOCK INSIDE lk + curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk curTs *types.TipSet cfgLk sync.RWMutex @@ -1001,19 +1001,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st } func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.getNonceLocked(ctx, addr, mp.curTs) } // GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() return mp.api.GetActorAfter(addr, mp.curTs) } @@ -1164,11 +1164,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u } func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.allPending(ctx) } @@ -1184,11 +1184,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage, } func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.pendingFor(ctx, a), mp.curTs } @@ -1237,9 +1237,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a maybeRepub := func(cid cid.Cid) { if !repubTrigger { - mp.lk.Lock() + mp.lk.RLock() _, republished := mp.republished[cid] - mp.lk.Unlock() + mp.lk.RUnlock() if republished { repubTrigger = true } @@ -1310,9 +1310,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a } if len(revert) > 0 && futureDebug { - mp.lk.Lock() + mp.lk.RLock() msgs, ts := mp.allPending(ctx) - mp.lk.Unlock() + mp.lk.RUnlock() buckets := map[address.Address]*statBucket{} diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index bd5044128..d510cf950 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq mp.curTsLk.Lock() defer mp.curTsLk.Unlock() + //TODO confirm if we can switch to RLock here for performance mp.lk.Lock() defer mp.lk.Unlock()