don't immediately publish messages that cannot be included in the next 20 blocks

This commit is contained in:
vyzo 2020-09-06 13:48:26 +03:00
parent 2e75d9c80a
commit 5659faf7f0

View File

@ -363,12 +363,12 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
return nil return nil
} }
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) error { func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
epoch := curTs.Height() epoch := curTs.Height()
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength()) minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil { if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
return xerrors.Errorf("message will not be included in a block: %w", err) return false, xerrors.Errorf("message will not be included in a block: %w", err)
} }
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks // this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
@ -376,18 +376,25 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
// on republish to push it through later, if the baseFee has fallen. // on republish to push it through later, if the baseFee has fallen.
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation // this is a defensive check that stops minimum baseFee spam attacks from overloading validation
// queues. // queues.
// Note that we don't do that for local messages, so that they can be accepted and republished // Note that for local messages, we always add them so that they can be accepted and republished
// automatically // automatically.
if !local && len(curTs.Blocks()) > 0 { publish := local
if len(curTs.Blocks()) > 0 {
baseFee := curTs.Blocks()[0].ParentBaseFee baseFee := curTs.Blocks()[0].ParentBaseFee
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor) baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) { if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
return xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w", if local {
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure) log.Warnf("local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s)",
m.Message.GasFeeCap, baseFeeLowerBound)
publish = false
} else {
return false, xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w",
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure)
}
} }
} }
return nil return publish, nil
} }
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
@ -408,7 +415,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
} }
mp.curTsLk.Lock() mp.curTsLk.Lock()
if err := mp.addTs(m, mp.curTs, true); err != nil { publish, err := mp.addTs(m, mp.curTs, true)
if err != nil {
mp.curTsLk.Unlock() mp.curTsLk.Unlock()
return cid.Undef, err return cid.Undef, err
} }
@ -421,7 +429,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
} }
mp.lk.Unlock() mp.lk.Unlock()
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}
return m.Cid(), err
} }
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
@ -469,7 +481,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock() mp.curTsLk.Lock()
defer mp.curTsLk.Unlock() defer mp.curTsLk.Unlock()
return mp.addTs(m, mp.curTs, false)
_, err = mp.addTs(m, mp.curTs, false)
return err
} }
func sigCacheKey(m *types.SignedMessage) (string, error) { func sigCacheKey(m *types.SignedMessage) (string, error) {
@ -536,28 +550,29 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
return nil return nil
} }
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) error { func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs) snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil { if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
} }
if snonce > m.Message.Nonce { if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
} }
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
if err := mp.verifyMsgBeforeAdd(m, curTs, local); err != nil { publish, err := mp.verifyMsgBeforeAdd(m, curTs, local)
return err if err != nil {
return false, err
} }
if err := mp.checkBalance(m, curTs); err != nil { if err := mp.checkBalance(m, curTs); err != nil {
return err return false, err
} }
return mp.addLocked(m, true) return publish, mp.addLocked(m, true)
} }
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
@ -583,7 +598,8 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
if err := mp.verifyMsgBeforeAdd(m, curTs, true); err != nil { _, err = mp.verifyMsgBeforeAdd(m, curTs, true)
if err != nil {
return err return err
} }
@ -769,7 +785,8 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, ErrTryAgain return nil, ErrTryAgain
} }
if err := mp.verifyMsgBeforeAdd(msg, curTs, true); err != nil { publish, err := mp.verifyMsgBeforeAdd(msg, curTs, true)
if err != nil {
return nil, err return nil, err
} }
@ -784,7 +801,11 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
log.Errorf("addLocal failed: %+v", err) log.Errorf("addLocal failed: %+v", err)
} }
return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}
return msg, err
} }
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {