Merge pull request #3000 from filecoin-project/feat/mpool-add-semaphore
Add semaphore in push and friends to reduce lock contention
This commit is contained in:
commit
43586ed9a1
@ -58,6 +58,8 @@ var (
|
||||
|
||||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
||||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
||||
|
||||
ErrTryAgain = errors.New("state inconsistency while signing message; please try again")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -71,6 +73,8 @@ type MessagePool struct {
|
||||
|
||||
ds dtypes.MetadataDS
|
||||
|
||||
addSema chan struct{}
|
||||
|
||||
closer chan struct{}
|
||||
repubTk *clock.Ticker
|
||||
|
||||
@ -159,6 +163,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
|
||||
|
||||
mp := &MessagePool{
|
||||
ds: ds,
|
||||
addSema: make(chan struct{}, 1),
|
||||
closer: make(chan struct{}),
|
||||
repubTk: build.Clock.Ticker(RepublishInterval),
|
||||
localAddrs: make(map[address.Address]struct{}),
|
||||
@ -250,6 +255,12 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
// serialize push access to reduce lock contention
|
||||
mp.addSema <- struct{}{}
|
||||
defer func() {
|
||||
<-mp.addSema
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
epoch := mp.curTs.Height()
|
||||
mp.curTsLk.Unlock()
|
||||
@ -295,6 +306,12 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// serialize push access to reduce lock contention
|
||||
mp.addSema <- struct{}{}
|
||||
defer func() {
|
||||
<-mp.addSema
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
return mp.addTs(m, mp.curTs)
|
||||
@ -493,31 +510,64 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
|
||||
}
|
||||
|
||||
func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
// serialize push access to reduce lock contention
|
||||
mp.addSema <- struct{}{}
|
||||
defer func() {
|
||||
<-mp.addSema
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
curTs := mp.curTs
|
||||
|
||||
fromKey := addr
|
||||
if fromKey.Protocol() == address.ID {
|
||||
var err error
|
||||
fromKey, err = mp.api.StateAccountKey(ctx, fromKey, mp.curTs)
|
||||
if err != nil {
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
return nil, xerrors.Errorf("resolving sender key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
nonce, err := mp.getNonceLocked(fromKey, mp.curTs)
|
||||
if err != nil {
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
|
||||
}
|
||||
|
||||
// release the locks for signing
|
||||
mp.lk.Unlock()
|
||||
mp.curTsLk.Unlock()
|
||||
|
||||
msg, err := cb(fromKey, nonce)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// reacquire the locks and check state for consistency
|
||||
mp.curTsLk.Lock()
|
||||
defer mp.curTsLk.Unlock()
|
||||
|
||||
if mp.curTs != curTs {
|
||||
return nil, ErrTryAgain
|
||||
}
|
||||
|
||||
mp.lk.Lock()
|
||||
defer mp.lk.Unlock()
|
||||
|
||||
nonce2, err := mp.getNonceLocked(fromKey, mp.curTs)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
|
||||
}
|
||||
|
||||
if nonce2 != nonce {
|
||||
return nil, ErrTryAgain
|
||||
}
|
||||
|
||||
if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -148,7 +148,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
||||
msg.GasFeeCap = big.Add(feeCap, msg.GasPremium)
|
||||
}
|
||||
|
||||
return a.Mpool.PushWithNonce(ctx, msg.From, func(from address.Address, nonce uint64) (*types.SignedMessage, error) {
|
||||
sign := func(from address.Address, nonce uint64) (*types.SignedMessage, error) {
|
||||
msg.Nonce = nonce
|
||||
if msg.From.Protocol() == address.ID {
|
||||
log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, from)
|
||||
@ -165,7 +165,17 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
|
||||
}
|
||||
|
||||
return a.WalletSignMessage(ctx, from, msg)
|
||||
})
|
||||
}
|
||||
|
||||
var m *types.SignedMessage
|
||||
var err error
|
||||
again:
|
||||
m, err = a.Mpool.PushWithNonce(ctx, msg.From, sign)
|
||||
if err == messagepool.ErrTryAgain {
|
||||
log.Debugf("temporary failure while pushing message: %s; retrying", err)
|
||||
goto again
|
||||
}
|
||||
return m, err
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user