fix deadlock in mpool.Push

This commit is contained in:
vyzo 2020-08-18 10:19:46 +03:00
parent 8cae101c04
commit 4287f76ed4

View File

@ -59,7 +59,7 @@ var (
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail") ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTryAgain = errors.New("state inconsistency while signing message; please try again") ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
) )
const ( const (
@ -264,6 +264,11 @@ func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.Cha
} }
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
}
// serialize push access to reduce lock contention // serialize push access to reduce lock contention
mp.addSema <- struct{}{} mp.addSema <- struct{}{}
defer func() { defer func() {
@ -271,7 +276,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}() }()
mp.curTsLk.Lock() mp.curTsLk.Lock()
epoch := mp.curTs.Height() curTs := mp.curTs
epoch := curTs.Height()
mp.curTsLk.Unlock() mp.curTsLk.Unlock()
if err := mp.verifyMsgBeforePush(m, epoch); err != nil { if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
return cid.Undef, err return cid.Undef, err
@ -282,9 +288,17 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
return cid.Undef, err return cid.Undef, err
} }
if err := mp.Add(m); err != nil { mp.curTsLk.Lock()
if mp.curTs != curTs {
mp.curTsLk.Unlock()
return cid.Undef, ErrTryAgain
}
if err := mp.addTs(m, mp.curTs); err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err return cid.Undef, err
} }
mp.curTsLk.Unlock()
mp.lk.Lock() mp.lk.Lock()
if err := mp.addLocal(m, msgb); err != nil { if err := mp.addLocal(m, msgb); err != nil {
@ -296,7 +310,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
} }
func (mp *MessagePool) Add(m *types.SignedMessage) error { func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
// big messages are bad, anti DOS // big messages are bad, anti DOS
if m.Size() > 32*1024 { if m.Size() > 32*1024 {
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
@ -315,6 +329,15 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
return err return err
} }
return nil
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
err := mp.checkMessage(m)
if err != nil {
return err
}
// serialize push access to reduce lock contention // serialize push access to reduce lock contention
mp.addSema <- struct{}{} mp.addSema <- struct{}{}
defer func() { defer func() {