From 7be18df6ea70674a8b23d06816b68ffa26d71498 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 12 Aug 2020 10:38:40 +0300 Subject: [PATCH 1/3] add semaphore in push and friends to reduce lock contention --- chain/messagepool/messagepool.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index c18f5f5ae..b8e2bd324 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -68,6 +68,8 @@ type MessagePool struct { ds dtypes.MetadataDS + addSema chan struct{} + closer chan struct{} repubTk *clock.Ticker @@ -158,6 +160,7 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa mp := &MessagePool{ ds: ds, + addSema: make(chan struct{}, 1), closer: make(chan struct{}), repubTk: build.Clock.Ticker(RepublishInterval), localAddrs: make(map[address.Address]struct{}), @@ -251,6 +254,12 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha } func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { + // serialize push access to reduce lock contention + mp.addSema <- struct{}{} + defer func() { + <-mp.addSema + }() + mp.curTsLk.Lock() epoch := mp.curTs.Height() mp.curTsLk.Unlock() @@ -296,6 +305,12 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { return err } + // serialize push access to reduce lock contention + mp.addSema <- struct{}{} + defer func() { + <-mp.addSema + }() + mp.curTsLk.Lock() defer mp.curTsLk.Unlock() return mp.addTs(m, mp.curTs) @@ -494,6 +509,12 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) ( } func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { + // serialize push access to reduce lock contention + mp.addSema <- struct{}{} + defer func() { + <-mp.addSema + }() + mp.curTsLk.Lock() defer mp.curTsLk.Unlock() From f2ac17845bbdbe080bdd5a9c86bb0b42e215635d Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 12 Aug 2020 20:26:58 +0300 Subject: [PATCH 2/3] relinquish the lock while signing in PushWithNonce --- chain/messagepool/messagepool.go | 35 +++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index b8e2bd324..cf5314fce 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -55,6 +55,8 @@ var ( ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail") ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") + + ErrTryAgain = errors.New("state inconsistency while signing message; please try again") ) const ( @@ -516,30 +518,57 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, }() mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() - mp.lk.Lock() - defer mp.lk.Unlock() + + curTs := mp.curTs fromKey := addr if fromKey.Protocol() == address.ID { var err error fromKey, err = mp.api.StateAccountKey(ctx, fromKey, mp.curTs) if err != nil { + mp.lk.Unlock() + mp.curTsLk.Unlock() return nil, xerrors.Errorf("resolving sender key: %w", err) } } nonce, err := mp.getNonceLocked(fromKey, mp.curTs) if err != nil { + mp.lk.Unlock() + mp.curTsLk.Unlock() return nil, xerrors.Errorf("get nonce locked failed: %w", err) } + // release the locks for signing + mp.lk.Unlock() + mp.curTsLk.Unlock() + msg, err := cb(fromKey, nonce) if err != nil { return nil, err } + // reacquire the locks and check state for consistency + mp.curTsLk.Lock() + defer mp.curTsLk.Unlock() + + if mp.curTs != curTs { + return nil, ErrTryAgain + } + + mp.lk.Lock() + defer mp.lk.Unlock() + + nonce2, err := mp.getNonceLocked(fromKey, mp.curTs) + if err != nil { + return nil, xerrors.Errorf("get nonce locked failed: %w", err) + } + + if nonce2 != nonce { + return nil, ErrTryAgain + } + if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil { return nil, err } From 43b30a80c19ecc4dda50e81570bf31c9cf37548f Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 12 Aug 2020 20:32:41 +0300 Subject: [PATCH 3/3] retry PushWithNonce if it fails with ErrTryAgain --- node/impl/full/mpool.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index cd6adef6d..33478a176 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -148,7 +148,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t msg.GasFeeCap = feeCap } - return a.Mpool.PushWithNonce(ctx, msg.From, func(from address.Address, nonce uint64) (*types.SignedMessage, error) { + sign := func(from address.Address, nonce uint64) (*types.SignedMessage, error) { msg.Nonce = nonce if msg.From.Protocol() == address.ID { log.Warnf("Push from ID address (%s), adjusting to %s", msg.From, from) @@ -165,7 +165,17 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t } return a.WalletSignMessage(ctx, from, msg) - }) + } + + var m *types.SignedMessage + var err error +again: + m, err = a.Mpool.PushWithNonce(ctx, msg.From, sign) + if err == messagepool.ErrTryAgain { + log.Debugf("temporary failure while pushing message: %s; retrying", err) + goto again + } + return m, err } func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {