Merge pull request #3139 from filecoin-project/fix/mpool-push-deadlock
fix deadlock in mpool.Push
This commit is contained in:
commit
1ba5dd4c72
@ -59,7 +59,7 @@ var (
|
||||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
||||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
||||
|
||||
ErrTryAgain = errors.New("state inconsistency while signing message; please try again")
|
||||
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
|
||||
)
|
||||
|
||||
const (
|
||||
@ -264,6 +264,11 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
// serialize push access to reduce lock contention
|
||||
mp.addSema <- struct{}{}
|
||||
defer func() {
|
||||
@ -271,7 +276,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
}()
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
epoch := mp.curTs.Height()
|
||||
curTs := mp.curTs
|
||||
epoch := curTs.Height()
|
||||
mp.curTsLk.Unlock()
|
||||
if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
|
||||
return cid.Undef, err
|
||||
@ -282,9 +288,17 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if err := mp.Add(m); err != nil {
|
||||
mp.curTsLk.Lock()
|
||||
if mp.curTs != curTs {
|
||||
mp.curTsLk.Unlock()
|
||||
return cid.Undef, ErrTryAgain
|
||||
}
|
||||
|
||||
if err := mp.addTs(m, mp.curTs); err != nil {
|
||||
mp.curTsLk.Unlock()
|
||||
return cid.Undef, err
|
||||
}
|
||||
mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
if err := mp.addLocal(m, msgb); err != nil {
|
||||
@ -296,7 +310,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
||||
// big messages are bad, anti DOS
|
||||
if m.Size() > 32*1024 {
|
||||
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
||||
@ -315,6 +329,15 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
||||
err := mp.checkMessage(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// serialize push access to reduce lock contention
|
||||
mp.addSema <- struct{}{}
|
||||
defer func() {
|
||||
|
Loading…
Reference in New Issue
Block a user