Merge pull request #3313 from filecoin-project/feat/mpool-balance-tracking
Message Pool Rudimentary Spam Protection Measures
This commit is contained in:
commit
84a632787c
@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
stdbig "math/big"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -47,6 +48,10 @@ const RbfDenom = 256
|
|||||||
|
|
||||||
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
|
var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
|
||||||
|
|
||||||
|
var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
|
||||||
|
|
||||||
|
var MaxActorPendingMessages = 1000
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrMessageTooBig = errors.New("message too big")
|
ErrMessageTooBig = errors.New("message too big")
|
||||||
|
|
||||||
@ -54,12 +59,15 @@ var (
|
|||||||
|
|
||||||
ErrNonceTooLow = errors.New("message nonce too low")
|
ErrNonceTooLow = errors.New("message nonce too low")
|
||||||
|
|
||||||
|
ErrGasFeeCapTooLow = errors.New("gas fee cap too low")
|
||||||
|
|
||||||
ErrNotEnoughFunds = errors.New("not enough funds to execute transaction")
|
ErrNotEnoughFunds = errors.New("not enough funds to execute transaction")
|
||||||
|
|
||||||
ErrInvalidToAddr = errors.New("message had invalid to address")
|
ErrInvalidToAddr = errors.New("message had invalid to address")
|
||||||
|
|
||||||
ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
|
ErrSoftValidationFailure = errors.New("validation failure")
|
||||||
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
|
||||||
|
ErrTooManyPendingMessages = errors.New("too many pending messages for actor")
|
||||||
|
|
||||||
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
|
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
|
||||||
)
|
)
|
||||||
@ -118,17 +126,19 @@ type MessagePool struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type msgSet struct {
|
type msgSet struct {
|
||||||
msgs map[uint64]*types.SignedMessage
|
msgs map[uint64]*types.SignedMessage
|
||||||
nextNonce uint64
|
nextNonce uint64
|
||||||
|
requiredFunds *stdbig.Int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMsgSet() *msgSet {
|
func newMsgSet() *msgSet {
|
||||||
return &msgSet{
|
return &msgSet{
|
||||||
msgs: make(map[uint64]*types.SignedMessage),
|
msgs: make(map[uint64]*types.SignedMessage),
|
||||||
|
requiredFunds: stdbig.NewInt(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool, error) {
|
||||||
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
|
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
|
||||||
ms.nextNonce = m.Message.Nonce + 1
|
ms.nextNonce = m.Message.Nonce + 1
|
||||||
}
|
}
|
||||||
@ -150,12 +160,44 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
|||||||
ErrRBFTooLowPremium)
|
ErrRBFTooLowPremium)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)
|
||||||
|
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !has && limit && len(ms.msgs) > MaxActorPendingMessages {
|
||||||
|
log.Errorf("too many pending messages from actor %s", m.Message.From)
|
||||||
|
return false, ErrTooManyPendingMessages
|
||||||
|
}
|
||||||
|
|
||||||
ms.msgs[m.Message.Nonce] = m
|
ms.msgs[m.Message.Nonce] = m
|
||||||
|
ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int)
|
||||||
|
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int)
|
||||||
|
|
||||||
return !has, nil
|
return !has, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *msgSet) rm(nonce uint64) {
|
||||||
|
m, has := ms.msgs[nonce]
|
||||||
|
if has {
|
||||||
|
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int)
|
||||||
|
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
|
||||||
|
delete(ms.msgs, nonce)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
|
||||||
|
requiredFunds := new(stdbig.Int).Set(ms.requiredFunds)
|
||||||
|
|
||||||
|
m, has := ms.msgs[nonce]
|
||||||
|
if has {
|
||||||
|
requiredFunds.Sub(requiredFunds, m.Message.RequiredFunds().Int)
|
||||||
|
//requiredFunds.Sub(requiredFunds, m.Message.Value.Int)
|
||||||
|
}
|
||||||
|
|
||||||
|
return types.BigInt{Int: requiredFunds}
|
||||||
|
}
|
||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
@ -257,7 +299,7 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.ChainEpoch) error {
|
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, epoch abi.ChainEpoch) error {
|
||||||
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
|
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
|
||||||
|
|
||||||
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
|
if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
|
||||||
@ -278,25 +320,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
|||||||
<-mp.addSema
|
<-mp.addSema
|
||||||
}()
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
|
||||||
curTs := mp.curTs
|
|
||||||
epoch := curTs.Height()
|
|
||||||
mp.curTsLk.Unlock()
|
|
||||||
if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
msgb, err := m.Serialize()
|
msgb, err := m.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
if mp.curTs != curTs {
|
|
||||||
mp.curTsLk.Unlock()
|
|
||||||
return cid.Undef, ErrTryAgain
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mp.addTs(m, mp.curTs); err != nil {
|
if err := mp.addTs(m, mp.curTs); err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -319,7 +348,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
|||||||
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform syntaxtic validation, minGas=0 as we check if correctly in select messages
|
// Perform syntactic validation, minGas=0 as we check the actual mingas before we add it
|
||||||
if err := m.Message.ValidForBlockInclusion(0); err != nil {
|
if err := m.Message.ValidForBlockInclusion(0); err != nil {
|
||||||
return xerrors.Errorf("message not valid for block inclusion: %w", err)
|
return xerrors.Errorf("message not valid for block inclusion: %w", err)
|
||||||
}
|
}
|
||||||
@ -332,8 +361,12 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
|||||||
return ErrMessageValueTooHigh
|
return ErrMessageValueTooHigh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.Message.GasFeeCap.LessThan(minimumBaseFee) {
|
||||||
|
return ErrGasFeeCapTooLow
|
||||||
|
}
|
||||||
|
|
||||||
if err := mp.VerifyMsgSig(m); err != nil {
|
if err := mp.VerifyMsgSig(m); err != nil {
|
||||||
log.Warnf("mpooladd signature verification failed: %s", err)
|
log.Warnf("signature verification failed: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,48 +426,71 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
|
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
|
||||||
|
}
|
||||||
|
|
||||||
|
requiredFunds := m.Message.RequiredFunds()
|
||||||
|
if balance.LessThan(requiredFunds) {
|
||||||
|
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrNotEnoughFunds)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add Value for soft failure check
|
||||||
|
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)
|
||||||
|
|
||||||
|
mset, ok := mp.pending[m.Message.From]
|
||||||
|
if ok {
|
||||||
|
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce))
|
||||||
|
}
|
||||||
|
|
||||||
|
if balance.LessThan(requiredFunds) {
|
||||||
|
// Note: we fail here for ErrSoftValidationFailure to signal a soft failure because we might
|
||||||
|
// be out of sync.
|
||||||
|
return xerrors.Errorf("not enough funds including pending messages (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrSoftValidationFailure)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
|
||||||
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
snonce, err := mp.getStateNonce(m.Message.From, curTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway)
|
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
|
||||||
}
|
}
|
||||||
|
|
||||||
if snonce > m.Message.Nonce {
|
if snonce > m.Message.Nonce {
|
||||||
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
|
||||||
}
|
}
|
||||||
|
|
||||||
balance, err := mp.getStateBalance(m.Message.From, curTs)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrBroadcastAnyway)
|
|
||||||
}
|
|
||||||
|
|
||||||
if balance.LessThan(m.Message.RequiredFunds()) {
|
|
||||||
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(m.Message.RequiredFunds()), types.FIL(balance), ErrNotEnoughFunds)
|
|
||||||
}
|
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.addLocked(m)
|
if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mp.checkBalance(m, curTs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return mp.addLocked(m, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.addLocked(m)
|
return mp.addLocked(m, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error {
|
||||||
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
|
||||||
if m.Signature.Type == crypto.SigTypeBLS {
|
if m.Signature.Type == crypto.SigTypeBLS {
|
||||||
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
mp.blsSigCache.Add(m.Cid(), m.Signature)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.Message.GasLimit > build.BlockGasLimit {
|
|
||||||
return xerrors.Errorf("given message has too high of a gas limit")
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := mp.api.PutMessage(m); err != nil {
|
if _, err := mp.api.PutMessage(m); err != nil {
|
||||||
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
|
||||||
return err
|
return err
|
||||||
@ -451,7 +507,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
|
|||||||
mp.pending[m.Message.From] = mset
|
mp.pending[m.Message.From] = mset
|
||||||
}
|
}
|
||||||
|
|
||||||
incr, err := mset.add(m, mp)
|
incr, err := mset.add(m, mp, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info(err)
|
log.Info(err)
|
||||||
return err
|
return err
|
||||||
@ -562,6 +618,16 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = mp.checkMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgb, err := msg.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// reacquire the locks and check state for consistency
|
// reacquire the locks and check state for consistency
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
@ -582,16 +648,15 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
|
|||||||
return nil, ErrTryAgain
|
return nil, ErrTryAgain
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil {
|
if err := mp.verifyMsgBeforeAdd(msg, curTs.Height()); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
msgb, err := msg.Serialize()
|
if err := mp.checkBalance(msg, curTs); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.addLocked(msg); err != nil {
|
if err := mp.addLocked(msg, true); err != nil {
|
||||||
return nil, xerrors.Errorf("add locked failed: %w", err)
|
return nil, xerrors.Errorf("add locked failed: %w", err)
|
||||||
}
|
}
|
||||||
if err := mp.addLocal(msg, msgb); err != nil {
|
if err := mp.addLocal(msg, msgb); err != nil {
|
||||||
@ -625,7 +690,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) {
|
|||||||
|
|
||||||
// NB: This deletes any message with the given nonce. This makes sense
|
// NB: This deletes any message with the given nonce. This makes sense
|
||||||
// as two messages with the same sender cannot have the same nonce
|
// as two messages with the same sender cannot have the same nonce
|
||||||
delete(mset.msgs, nonce)
|
mset.rm(nonce)
|
||||||
|
|
||||||
if len(mset.msgs) == 0 {
|
if len(mset.msgs) == 0 {
|
||||||
delete(mp.pending, from)
|
delete(mp.pending, from)
|
||||||
|
@ -373,6 +373,7 @@ func TestPruningSimple(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
tma.setBalance(sender, 1) // in FIL
|
||||||
target := mock.Address(1001)
|
target := mock.Address(1001)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
@ -430,6 +431,8 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
||||||
msgs := make(map[cid.Cid]struct{})
|
msgs := make(map[cid.Cid]struct{})
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
@ -500,6 +503,8 @@ func TestClearAll(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
@ -552,6 +557,9 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
|
|
||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
@ -619,6 +627,10 @@ func TestUpdates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
||||||
|
|
||||||
|
tma.setBalance(a1, 1) // in FIL
|
||||||
|
tma.setBalance(a2, 1) // in FIL
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
_, err := mp.Push(m)
|
_, err := mp.Push(m)
|
||||||
|
@ -23,6 +23,11 @@ import (
|
|||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// bump this for the selection tests
|
||||||
|
MaxActorPendingMessages = 1000000
|
||||||
|
}
|
||||||
|
|
||||||
func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
|
func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
From: from,
|
From: from,
|
||||||
|
@ -549,10 +549,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
)
|
)
|
||||||
recordFailure(ctx, metrics.MessageValidationFailure, "add")
|
recordFailure(ctx, metrics.MessageValidationFailure, "add")
|
||||||
switch {
|
switch {
|
||||||
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
|
case xerrors.Is(err, messagepool.ErrSoftValidationFailure):
|
||||||
fallthrough
|
fallthrough
|
||||||
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
|
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
|
||||||
fallthrough
|
fallthrough
|
||||||
|
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
|
||||||
|
fallthrough
|
||||||
case xerrors.Is(err, messagepool.ErrNonceTooLow):
|
case xerrors.Is(err, messagepool.ErrNonceTooLow):
|
||||||
return pubsub.ValidationIgnore
|
return pubsub.ValidationIgnore
|
||||||
default:
|
default:
|
||||||
|
@ -126,7 +126,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
Topics: map[string]*pubsub.TopicScoreParams{
|
Topics: map[string]*pubsub.TopicScoreParams{
|
||||||
drandTopic: {
|
drandTopic: {
|
||||||
// expected 2 beaconsn/min
|
// expected 2 beaconsn/min
|
||||||
TopicWeight: 0.5, // 5x block topic
|
TopicWeight: 0.5, // 5x block topic; max cap is 62.5
|
||||||
|
|
||||||
// 1 tick per second, maxes at 1 after 1 hour
|
// 1 tick per second, maxes at 1 after 1 hour
|
||||||
TimeInMeshWeight: 0.00027, // ~1/3600
|
TimeInMeshWeight: 0.00027, // ~1/3600
|
||||||
@ -154,7 +154,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
},
|
},
|
||||||
build.BlocksTopic(in.Nn): {
|
build.BlocksTopic(in.Nn): {
|
||||||
// expected 10 blocks/min
|
// expected 10 blocks/min
|
||||||
TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100
|
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100
|
||||||
|
|
||||||
// 1 tick per second, maxes at 1 after 1 hour
|
// 1 tick per second, maxes at 1 after 1 hour
|
||||||
TimeInMeshWeight: 0.00027, // ~1/3600
|
TimeInMeshWeight: 0.00027, // ~1/3600
|
||||||
@ -195,18 +195,17 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
},
|
},
|
||||||
build.MessagesTopic(in.Nn): {
|
build.MessagesTopic(in.Nn): {
|
||||||
// expected > 1 tx/second
|
// expected > 1 tx/second
|
||||||
TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100
|
TopicWeight: 0.1, // max cap is 5, single invalid message is -100
|
||||||
|
|
||||||
// 1 tick per second, maxes at 1 hour
|
// 1 tick per second, maxes at 1 hour
|
||||||
TimeInMeshWeight: 0.0002778, // ~1/3600
|
TimeInMeshWeight: 0.0002778, // ~1/3600
|
||||||
TimeInMeshQuantum: time.Second,
|
TimeInMeshQuantum: time.Second,
|
||||||
TimeInMeshCap: 1,
|
TimeInMeshCap: 1,
|
||||||
|
|
||||||
// deliveries decay after 10min, cap at 1000 tx
|
// deliveries decay after 10min, cap at 100 tx
|
||||||
FirstMessageDeliveriesWeight: 0.5, // max value is 500
|
FirstMessageDeliveriesWeight: 0.5, // max value is 50
|
||||||
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
|
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
|
||||||
//FirstMessageDeliveriesCap: 1000,
|
FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes
|
||||||
FirstMessageDeliveriesCap: 1, // we can't yet properly validate them so only confer a tiny boost from delivery
|
|
||||||
|
|
||||||
// Mesh Delivery Failure is currently turned off for messages
|
// Mesh Delivery Failure is currently turned off for messages
|
||||||
// This is on purpose as the network is still too small, which results in
|
// This is on purpose as the network is still too small, which results in
|
||||||
@ -225,7 +224,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
|
|||||||
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),
|
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),
|
||||||
|
|
||||||
// invalid messages decay after 1 hour
|
// invalid messages decay after 1 hour
|
||||||
InvalidMessageDeliveriesWeight: -2000,
|
InvalidMessageDeliveriesWeight: -1000,
|
||||||
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user