Merge pull request #10561 from filecoin-project/mikers/messagepoolRWMutexes
perf: message pool: change locks to RWMutexes for performance
This commit is contained in:
commit
139bde3771
@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
|
|||||||
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
|
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
|
||||||
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||||
var msgs []*types.Message
|
var msgs []*types.Message
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
mset, ok := mp.pending[from]
|
mset, ok := mp.pending[from]
|
||||||
if ok {
|
if ok {
|
||||||
for _, sm := range mset.msgs {
|
for _, sm := range mset.msgs {
|
||||||
msgs = append(msgs, &sm.Message)
|
msgs = append(msgs, &sm.Message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
|
|||||||
msgMap := make(map[address.Address]map[uint64]*types.Message)
|
msgMap := make(map[address.Address]map[uint64]*types.Message)
|
||||||
count := 0
|
count := 0
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
for _, m := range replace {
|
for _, m := range replace {
|
||||||
mmap, ok := msgMap[m.From]
|
mmap, ok := msgMap[m.From]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
|
|||||||
}
|
}
|
||||||
mmap[m.Nonce] = m
|
mmap[m.Nonce] = m
|
||||||
}
|
}
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
|
|
||||||
msgs := make([]*types.Message, 0, count)
|
msgs := make([]*types.Message, 0, count)
|
||||||
start := 0
|
start := 0
|
||||||
@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
|||||||
if mp.api.IsLite() {
|
if mp.api.IsLite() {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
curTs := mp.curTs
|
curTs := mp.curTs
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.RUnlock()
|
||||||
|
|
||||||
epoch := curTs.Height() + 1
|
epoch := curTs.Height() + 1
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
|||||||
|
|
||||||
st, ok := state[m.From]
|
st, ok := state[m.From]
|
||||||
if !ok {
|
if !ok {
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
mset, ok := mp.pending[m.From]
|
mset, ok := mp.pending[m.From]
|
||||||
if ok && !interned {
|
if ok && !interned {
|
||||||
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
|
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
|
||||||
@ -151,14 +151,14 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
|||||||
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
|
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
|
||||||
}
|
}
|
||||||
state[m.From] = st
|
state[m.From] = st
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
|
|
||||||
check.OK = true
|
check.OK = true
|
||||||
check.Hint = map[string]interface{}{
|
check.Hint = map[string]interface{}{
|
||||||
"nonce": st.nextNonce,
|
"nonce": st.nextNonce,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
|
|
||||||
stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
|
stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -118,7 +118,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MessagePool struct {
|
type MessagePool struct {
|
||||||
lk sync.Mutex
|
lk sync.RWMutex
|
||||||
|
|
||||||
ds dtypes.MetadataDS
|
ds dtypes.MetadataDS
|
||||||
|
|
||||||
@ -137,9 +137,9 @@ type MessagePool struct {
|
|||||||
// do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively
|
// do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively
|
||||||
pending map[address.Address]*msgSet
|
pending map[address.Address]*msgSet
|
||||||
|
|
||||||
keyCache map[address.Address]address.Address
|
keyCache *lru.Cache[address.Address, address.Address]
|
||||||
|
|
||||||
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
|
curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk
|
||||||
curTs *types.TipSet
|
curTs *types.TipSet
|
||||||
|
|
||||||
cfgLk sync.RWMutex
|
cfgLk sync.RWMutex
|
||||||
@ -372,6 +372,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra
|
|||||||
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize)
|
||||||
noncecache, _ := lru.New[nonceCacheKey, uint64](256)
|
noncecache, _ := lru.New[nonceCacheKey, uint64](256)
|
||||||
|
keycache, _ := lru.New[address.Address, address.Address](1_000_000)
|
||||||
|
|
||||||
cfg, err := loadConfig(ctx, ds)
|
cfg, err := loadConfig(ctx, ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -390,7 +391,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra
|
|||||||
repubTrigger: make(chan struct{}, 1),
|
repubTrigger: make(chan struct{}, 1),
|
||||||
localAddrs: make(map[address.Address]struct{}),
|
localAddrs: make(map[address.Address]struct{}),
|
||||||
pending: make(map[address.Address]*msgSet),
|
pending: make(map[address.Address]*msgSet),
|
||||||
keyCache: make(map[address.Address]address.Address),
|
keyCache: keycache,
|
||||||
minGasPrice: types.NewInt(0),
|
minGasPrice: types.NewInt(0),
|
||||||
getNtwkVersion: us.GetNtwkVersion,
|
getNtwkVersion: us.GetNtwkVersion,
|
||||||
pruneTrigger: make(chan struct{}, 1),
|
pruneTrigger: make(chan struct{}, 1),
|
||||||
@ -474,8 +475,8 @@ func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error {
|
|||||||
|
|
||||||
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
|
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
|
||||||
// check the cache
|
// check the cache
|
||||||
a, f := mp.keyCache[addr]
|
a, ok := mp.keyCache.Get(addr)
|
||||||
if f {
|
if ok {
|
||||||
return a, nil
|
return a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -486,8 +487,8 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// place both entries in the cache (may both be key addresses, which is fine)
|
// place both entries in the cache (may both be key addresses, which is fine)
|
||||||
mp.keyCache[addr] = ka
|
mp.keyCache.Add(addr, ka)
|
||||||
mp.keyCache[ka] = ka
|
mp.keyCache.Add(ka, ka)
|
||||||
|
|
||||||
return ka, nil
|
return ka, nil
|
||||||
}
|
}
|
||||||
@ -763,7 +764,28 @@ func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
|
|||||||
<-mp.addSema
|
<-mp.addSema
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
mp.curTsLk.RLock()
|
||||||
|
tmpCurTs := mp.curTs
|
||||||
|
mp.curTsLk.RUnlock()
|
||||||
|
|
||||||
|
//ensures computations are cached without holding lock
|
||||||
|
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
|
||||||
|
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
|
if tmpCurTs == mp.curTs {
|
||||||
|
//with the lock enabled, mp.curTs is the same Ts as we just had, so we know that our computations are cached
|
||||||
|
} else {
|
||||||
|
//curTs has been updated so we want to cache the new one:
|
||||||
|
tmpCurTs = mp.curTs
|
||||||
|
//we want to release the lock, cache the computations then grab it again
|
||||||
|
mp.curTsLk.Unlock()
|
||||||
|
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
|
||||||
|
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
|
||||||
|
mp.curTsLk.Lock()
|
||||||
|
//now that we have the lock, we continue, we could do this as a loop forever, but that's bad to loop forever, and this was added as an optimization and it seems once is enough because the computation < block time
|
||||||
|
}
|
||||||
|
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
_, err = mp.addTs(ctx, m, mp.curTs, false, false)
|
_, err = mp.addTs(ctx, m, mp.curTs, false, false)
|
||||||
@ -852,9 +874,6 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs
|
|||||||
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.lk.Lock()
|
|
||||||
defer mp.lk.Unlock()
|
|
||||||
|
|
||||||
senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs)
|
senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("failed to get sender actor: %w", err)
|
return false, xerrors.Errorf("failed to get sender actor: %w", err)
|
||||||
@ -869,6 +888,9 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs
|
|||||||
return false, xerrors.Errorf("sender actor %s is not a valid top-level sender", m.Message.From)
|
return false, xerrors.Errorf("sender actor %s is not a valid top-level sender", m.Message.From)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mp.lk.Lock()
|
||||||
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
publish, err := mp.verifyMsgBeforeAdd(ctx, m, curTs, local)
|
publish, err := mp.verifyMsgBeforeAdd(ctx, m, curTs, local)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("verify msg failed: %w", err)
|
return false, xerrors.Errorf("verify msg failed: %w", err)
|
||||||
@ -1001,19 +1023,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
|
func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.RUnlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.RUnlock()
|
||||||
|
|
||||||
return mp.getNonceLocked(ctx, addr, mp.curTs)
|
return mp.getNonceLocked(ctx, addr, mp.curTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
|
// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
|
||||||
func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) {
|
func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.RUnlock()
|
||||||
return mp.api.GetActorAfter(addr, mp.curTs)
|
return mp.api.GetActorAfter(addr, mp.curTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1164,11 +1186,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.RUnlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.RUnlock()
|
||||||
|
|
||||||
return mp.allPending(ctx)
|
return mp.allPending(ctx)
|
||||||
}
|
}
|
||||||
@ -1184,11 +1206,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.RUnlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.RUnlock()
|
||||||
return mp.pendingFor(ctx, a), mp.curTs
|
return mp.pendingFor(ctx, a), mp.curTs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1237,9 +1259,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
|||||||
|
|
||||||
maybeRepub := func(cid cid.Cid) {
|
maybeRepub := func(cid cid.Cid) {
|
||||||
if !repubTrigger {
|
if !repubTrigger {
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
_, republished := mp.republished[cid]
|
_, republished := mp.republished[cid]
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
if republished {
|
if republished {
|
||||||
repubTrigger = true
|
repubTrigger = true
|
||||||
}
|
}
|
||||||
@ -1310,9 +1332,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(revert) > 0 && futureDebug {
|
if len(revert) > 0 && futureDebug {
|
||||||
mp.lk.Lock()
|
mp.lk.RLock()
|
||||||
msgs, ts := mp.allPending(ctx)
|
msgs, ts := mp.allPending(ctx)
|
||||||
mp.lk.Unlock()
|
mp.lk.RUnlock()
|
||||||
|
|
||||||
buckets := map[address.Address]*statBucket{}
|
buckets := map[address.Address]*statBucket{}
|
||||||
|
|
||||||
|
@ -20,17 +20,18 @@ const repubMsgLimit = 30
|
|||||||
var RepublishBatchDelay = 100 * time.Millisecond
|
var RepublishBatchDelay = 100 * time.Millisecond
|
||||||
|
|
||||||
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.RLock()
|
||||||
ts := mp.curTs
|
ts := mp.curTs
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
|
mp.curTsLk.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
|
||||||
return xerrors.Errorf("computing basefee: %w", err)
|
return xerrors.Errorf("computing basefee: %w", err)
|
||||||
}
|
}
|
||||||
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
|
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
|
||||||
|
|
||||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
|
mp.curTsLk.Lock()
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
mp.republished = nil // clear this to avoid races triggering an early republish
|
mp.republished = nil // clear this to avoid races triggering an early republish
|
||||||
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||||
|
@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
|
|||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
|
//TODO confirm if we can switch to RLock here for performance
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user