perf: mempool: lower priority optimizations (#10693)
* release the read lock earlier as it is not needed for chaincomputebasefee * chain/messagepool/selection.go change to read lock in SelectMessages * tighten up locks in chain/messagepool/repub.go and two questions on whether curTsLks are needed as comments * include suggestion from @Jorropo to preallocate our msgs array so that we only need to make a single allocation * mp.pending should not be accessed directly but through the getter * from @arajasek: just check whether the sender is a robust address (anything except an ID address is robust) here, and return if so. That will: be faster reduce the size of this cache by half, because we can drop mp.keyCache.Add(ka, ka) on line 491. * do not need curTslk and clean up code comments
This commit is contained in:
parent
093d350df3
commit
742062f84c
@ -33,8 +33,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
|
||||
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
|
||||
var msgs []*types.Message
|
||||
mp.lk.RLock()
|
||||
mset, ok := mp.pending[from]
|
||||
mset, ok, err := mp.getPendingMset(ctx, from)
|
||||
if err != nil {
|
||||
log.Warnf("errored while getting pending mset: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
msgs = make([]*types.Message, 0, len(mset.msgs))
|
||||
for _, sm := range mset.msgs {
|
||||
msgs = append(msgs, &sm.Message)
|
||||
}
|
||||
@ -64,7 +69,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
|
||||
if !ok {
|
||||
mmap = make(map[uint64]*types.Message)
|
||||
msgMap[m.From] = mmap
|
||||
mset, ok := mp.pending[m.From]
|
||||
mset, ok, err := mp.getPendingMset(ctx, m.From)
|
||||
if err != nil {
|
||||
log.Warnf("errored while getting pending mset: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
if ok {
|
||||
count += len(mset.msgs)
|
||||
for _, sm := range mset.msgs {
|
||||
@ -144,7 +153,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
|
||||
st, ok := state[m.From]
|
||||
if !ok {
|
||||
mp.lk.RLock()
|
||||
mset, ok := mp.pending[m.From]
|
||||
mset, ok, err := mp.getPendingMset(ctx, m.From)
|
||||
if err != nil {
|
||||
log.Warnf("errored while getting pending mset: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
if ok && !interned {
|
||||
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
|
||||
for _, m := range mset.msgs {
|
||||
|
@ -474,6 +474,15 @@ func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error {
|
||||
}
|
||||
|
||||
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
|
||||
//if addr is not an ID addr, then it is already resolved to a key
|
||||
if addr.Protocol() != address.ID {
|
||||
return addr, nil
|
||||
}
|
||||
return mp.resolveToKeyFromID(ctx, addr)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) {
|
||||
|
||||
// check the cache
|
||||
a, ok := mp.keyCache.Get(addr)
|
||||
if ok {
|
||||
@ -488,8 +497,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (
|
||||
|
||||
// place both entries in the cache (may both be key addresses, which is fine)
|
||||
mp.keyCache.Add(addr, ka)
|
||||
mp.keyCache.Add(ka, ka)
|
||||
|
||||
return ka, nil
|
||||
}
|
||||
|
||||
|
@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond
|
||||
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
||||
mp.curTsLk.RLock()
|
||||
ts := mp.curTs
|
||||
mp.curTsLk.RUnlock()
|
||||
|
||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||
mp.curTsLk.RUnlock()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("computing basefee: %w", err)
|
||||
}
|
||||
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
|
||||
|
||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||
mp.curTsLk.Lock()
|
||||
|
||||
mp.lk.Lock()
|
||||
mp.republished = nil // clear this to avoid races triggering an early republish
|
||||
mp.lk.Unlock()
|
||||
|
||||
mp.lk.RLock()
|
||||
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
|
||||
mset, ok, err := mp.getPendingMset(ctx, actor)
|
||||
if err != nil {
|
||||
@ -54,9 +57,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
|
||||
}
|
||||
pending[actor] = pend
|
||||
})
|
||||
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
mp.lk.RUnlock()
|
||||
|
||||
if len(pending) == 0 {
|
||||
return nil
|
||||
@ -177,8 +178,8 @@ loop:
|
||||
republished[m.Cid()] = struct{}{}
|
||||
}
|
||||
|
||||
mp.lk.Lock()
|
||||
// update the republished set so that we can trigger early republish from head changes
|
||||
mp.lk.Lock()
|
||||
mp.republished = republished
|
||||
mp.lk.Unlock()
|
||||
|
||||
|
@ -40,12 +40,11 @@ type msgChain struct {
|
||||
}
|
||||
|
||||
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
mp.curTsLk.RLock()
|
||||
defer mp.curTsLk.RUnlock()
|
||||
|
||||
//TODO confirm if we can switch to RLock here for performance
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
mp.lk.RLock()
|
||||
defer mp.lk.RUnlock()
|
||||
|
||||
// See if we need to prune before selection; excessive buildup can lead to slow selection,
|
||||
// so prune if we have too many messages (ignoring the cooldown).
|
||||
|
Loading…
Reference in New Issue
Block a user