cache the tipset nonce calculation before holding the write lock, and also verify that the the calculation is cached after grabbing the write lock. if it is not cached, give up the lock, calculate, and then grab the write lock again

This commit is contained in:
Mikers 2023-03-28 08:11:00 +00:00
parent 6f440a6420
commit ad81cd18c2
2 changed files with 30 additions and 9 deletions

View File

@ -763,7 +763,28 @@ func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
<-mp.addSema <-mp.addSema
}() }()
mp.curTsLk.Lock() //Ensure block calculation is cached without holding the write lock
mp.curTsLk.RLock()
tmpCurTs := mp.curTs
mp.curTsLk.RUnlock()
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
//if the newly acquired Ts is not the one we just cached, let go of the lock, cache it and open the lock again and repeat....
for {
mp.curTsLk.Lock()
writeCurTs := mp.curTs
if writeCurTs == tmpCurTs {
break // we have this cached we can skip
}
mp.curTsLk.Unlock()
tmpCurTs = writeCurTs
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
}
defer mp.curTsLk.Unlock() defer mp.curTsLk.Unlock()
_, err = mp.addTs(ctx, m, mp.curTs, false, false) _, err = mp.addTs(ctx, m, mp.curTs, false, false)
@ -852,14 +873,14 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
} }
mp.lk.Lock()
defer mp.lk.Unlock()
senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs) senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs)
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to get sender actor: %w", err) return false, xerrors.Errorf("failed to get sender actor: %w", err)
} }
mp.lk.Lock()
defer mp.lk.Unlock()
// This message can only be included in the _next_ epoch and beyond, hence the +1. // This message can only be included in the _next_ epoch and beyond, hence the +1.
epoch := curTs.Height() + 1 epoch := curTs.Height() + 1
nv := mp.api.StateNetworkVersion(ctx, epoch) nv := mp.api.StateNetworkVersion(ctx, epoch)

View File

@ -20,18 +20,18 @@ const repubMsgLimit = 30
var RepublishBatchDelay = 100 * time.Millisecond var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTsLk.Lock() mp.curTsLk.RLock()
ts := mp.curTs ts := mp.curTs
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
if err != nil { if err != nil {
mp.curTsLk.Unlock() mp.curTsLk.RUnlock()
return xerrors.Errorf("computing basefee: %w", err) return xerrors.Errorf("computing basefee: %w", err)
} }
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor) baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
pending := make(map[address.Address]map[uint64]*types.SignedMessage) pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock() mp.lk.RLock()
mp.republished = nil // clear this to avoid races triggering an early republish mp.republished = nil // clear this to avoid races triggering an early republish
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) { mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
mset, ok, err := mp.getPendingMset(ctx, actor) mset, ok, err := mp.getPendingMset(ctx, actor)
@ -54,8 +54,8 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
pending[actor] = pend pending[actor] = pend
}) })
mp.lk.Unlock() mp.lk.RUnlock()
mp.curTsLk.Unlock() mp.curTsLk.RUnlock()
if len(pending) == 0 { if len(pending) == 0 {
return nil return nil