Merge pull request #3592 from filecoin-project/feat/improved-publish-logic
Improve publish/republish logic
This commit is contained in:
commit
3398426293
@ -45,7 +45,7 @@ var rbfDenomBig = types.NewInt(RbfDenom)
|
|||||||
|
|
||||||
const RbfDenom = 256
|
const RbfDenom = 256
|
||||||
|
|
||||||
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
|
var RepublishInterval = time.Duration(10*build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
|
||||||
|
|
||||||
var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
|
var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
|
||||||
var baseFeeLowerBoundFactor = types.NewInt(10)
|
var baseFeeLowerBoundFactor = types.NewInt(10)
|
||||||
@ -81,6 +81,14 @@ const (
|
|||||||
localUpdates = "update"
|
localUpdates = "update"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// if the republish interval is too short compared to the pubsub timecache, adjust it
|
||||||
|
minInterval := pubsub.TimeCacheDuration + time.Duration(build.PropagationDelaySecs)
|
||||||
|
if RepublishInterval < minInterval {
|
||||||
|
RepublishInterval = minInterval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type MessagePool struct {
|
type MessagePool struct {
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
|
|
||||||
@ -355,12 +363,22 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) error {
|
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
|
||||||
|
// and whether the message has enough funds to be included in the next 20 blocks.
|
||||||
|
// If the message is not valid for block inclusion, it returns an error.
|
||||||
|
// For local messages, if the message can be included in the next 20 blocks, it returns true to
|
||||||
|
// signal that it should be immediately published. If the message cannot be included in the next 20
|
||||||
|
// blocks, it returns false so that the message doesn't immediately get published (and ignored by our
|
||||||
|
// peers); instead it will be published through the republish loop, once the base fee has fallen
|
||||||
|
// sufficiently.
|
||||||
|
// For non local messages, if the message cannot be included in the next 20 blocks it returns
|
||||||
|
// a (soft) validation error.
|
||||||
|
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
|
||||||
epoch := curTs.Height()
|
epoch := curTs.Height()
|
||||||
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
|
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
|
||||||
|
|
||||||
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
|
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
|
||||||
return xerrors.Errorf("message will not be included in a block: %w", err)
|
return false, xerrors.Errorf("message will not be included in a block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
|
// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
|
||||||
@ -368,18 +386,25 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
|||||||
// on republish to push it through later, if the baseFee has fallen.
|
// on republish to push it through later, if the baseFee has fallen.
|
||||||
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
|
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
|
||||||
// queues.
|
// queues.
|
||||||
// Note that we don't do that for local messages, so that they can be accepted and republished
|
// Note that for local messages, we always add them so that they can be accepted and republished
|
||||||
// automatically
|
// automatically.
|
||||||
if !local && len(curTs.Blocks()) > 0 {
|
publish := local
|
||||||
|
if len(curTs.Blocks()) > 0 {
|
||||||
baseFee := curTs.Blocks()[0].ParentBaseFee
|
baseFee := curTs.Blocks()[0].ParentBaseFee
|
||||||
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
|
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
|
||||||
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
|
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
|
||||||
return xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w",
|
if local {
|
||||||
|
log.Warnf("local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s)",
|
||||||
|
m.Message.GasFeeCap, baseFeeLowerBound)
|
||||||
|
publish = false
|
||||||
|
} else {
|
||||||
|
return false, xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w",
|
||||||
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure)
|
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return publish, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||||
@ -400,7 +425,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
if err := mp.addTs(m, mp.curTs, true); err != nil {
|
publish, err := mp.addTs(m, mp.curTs, true)
|
||||||
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
@ -413,7 +439,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
}
|
}
|
||||||
mp.lk.Unlock()
|
mp.lk.Unlock()
|
||||||
|
|
||||||
return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
if publish {
|
||||||
|
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.Cid(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
||||||
@ -461,7 +491,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
|
|||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
return mp.addTs(m, mp.curTs, false)
|
|
||||||
|
_, err = mp.addTs(m, mp.curTs, false)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func sigCacheKey(m *types.SignedMessage) (string, error) {
|
func sigCacheKey(m *types.SignedMessage) (string, error) {
|
||||||
@ -528,28 +560,29 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) error {
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
if snonce > m.Message.Nonce {
|
if snonce > m.Message.Nonce {
|
||||||
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
if err := mp.verifyMsgBeforeAdd(m, curTs, local); err != nil {
|
publish, err := mp.verifyMsgBeforeAdd(m, curTs, local)
|
||||||
return err
|
if err != nil {
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.checkBalance(m, curTs); err != nil {
|
if err := mp.checkBalance(m, curTs); err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return mp.addLocked(m, true)
|
return publish, mp.addLocked(m, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
||||||
@ -575,7 +608,8 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
|||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
if err := mp.verifyMsgBeforeAdd(m, curTs, true); err != nil {
|
_, err = mp.verifyMsgBeforeAdd(m, curTs, true)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -761,7 +795,8 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
|
|||||||
return nil, ErrTryAgain
|
return nil, ErrTryAgain
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.verifyMsgBeforeAdd(msg, curTs, true); err != nil {
|
publish, err := mp.verifyMsgBeforeAdd(msg, curTs, true)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -776,7 +811,11 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
|
|||||||
log.Errorf("addLocal failed: %+v", err)
|
log.Errorf("addLocal failed: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
if publish {
|
||||||
|
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
||||||
|
@ -16,7 +16,7 @@ import (
|
|||||||
|
|
||||||
const repubMsgLimit = 30
|
const repubMsgLimit = 30
|
||||||
|
|
||||||
var RepublishBatchDelay = 200 * time.Millisecond
|
var RepublishBatchDelay = 100 * time.Millisecond
|
||||||
|
|
||||||
func (mp *MessagePool) republishPendingMessages() error {
|
func (mp *MessagePool) republishPendingMessages() error {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
@ -27,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return xerrors.Errorf("computing basefee: %w", err)
|
return xerrors.Errorf("computing basefee: %w", err)
|
||||||
}
|
}
|
||||||
|
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
|
||||||
|
|
||||||
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
pending := make(map[address.Address]map[uint64]*types.SignedMessage)
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
@ -55,7 +56,11 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
|
|
||||||
var chains []*msgChain
|
var chains []*msgChain
|
||||||
for actor, mset := range pending {
|
for actor, mset := range pending {
|
||||||
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
// We use the baseFee lower bound for createChange so that we optimistically include
|
||||||
|
// chains that might become profitable in the next 20 blocks.
|
||||||
|
// We still check the lowerBound condition for individual messages so that we don't send
|
||||||
|
// messages that will be rejected by the mpool spam protector, so this is safe to do.
|
||||||
|
next := mp.createMessageChains(actor, mset, baseFeeLowerBound, ts)
|
||||||
chains = append(chains, next...)
|
chains = append(chains, next...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,6 +75,7 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
gasLimit := int64(build.BlockGasLimit)
|
gasLimit := int64(build.BlockGasLimit)
|
||||||
minGas := int64(gasguess.MinGas)
|
minGas := int64(gasguess.MinGas)
|
||||||
var msgs []*types.SignedMessage
|
var msgs []*types.SignedMessage
|
||||||
|
loop:
|
||||||
for i := 0; i < len(chains); {
|
for i := 0; i < len(chains); {
|
||||||
chain := chains[i]
|
chain := chains[i]
|
||||||
|
|
||||||
@ -91,8 +97,18 @@ func (mp *MessagePool) republishPendingMessages() error {
|
|||||||
|
|
||||||
// does it fit in a block?
|
// does it fit in a block?
|
||||||
if chain.gasLimit <= gasLimit {
|
if chain.gasLimit <= gasLimit {
|
||||||
gasLimit -= chain.gasLimit
|
// check the baseFee lower bound -- only republish messages that can be included in the chain
|
||||||
msgs = append(msgs, chain.msgs...)
|
// within the next 20 blocks.
|
||||||
|
for _, m := range chain.msgs {
|
||||||
|
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
|
||||||
|
chain.Invalidate()
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
gasLimit -= m.Message.GasLimit
|
||||||
|
msgs = append(msgs, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// we processed the whole chain, advance
|
||||||
i++
|
i++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user