Merge pull request #3450 from filecoin-project/feat/mpool-nonce-gaps

track expected nonce in mpool, ignore messages with large nonce gaps
This commit is contained in:
Aayush Rajasekaran 2020-09-03 12:27:32 -04:00 committed by GitHub
commit 5569f96885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 33 deletions

View File

@ -52,6 +52,8 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
var MaxActorPendingMessages = 1000 var MaxActorPendingMessages = 1000
var MaxNonceGap = uint64(4)
var ( var (
ErrMessageTooBig = errors.New("message too big") ErrMessageTooBig = errors.New("message too big")
@ -68,6 +70,7 @@ var (
ErrSoftValidationFailure = errors.New("validation failure") ErrSoftValidationFailure = errors.New("validation failure")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium") ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTooManyPendingMessages = errors.New("too many pending messages for actor") ErrTooManyPendingMessages = errors.New("too many pending messages for actor")
ErrNonceGap = errors.New("unfulfilled nonce gap")
ErrTryAgain = errors.New("state inconsistency while pushing message; please try again") ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
) )
@ -131,19 +134,39 @@ type msgSet struct {
requiredFunds *stdbig.Int requiredFunds *stdbig.Int
} }
func newMsgSet() *msgSet { func newMsgSet(nonce uint64) *msgSet {
return &msgSet{ return &msgSet{
msgs: make(map[uint64]*types.SignedMessage), msgs: make(map[uint64]*types.SignedMessage),
nextNonce: nonce,
requiredFunds: stdbig.NewInt(0), requiredFunds: stdbig.NewInt(0),
} }
} }
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool, error) { func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce { nextNonce := ms.nextNonce
ms.nextNonce = m.Message.Nonce + 1 nonceGap := false
switch {
case m.Message.Nonce == nextNonce:
nextNonce++
// advance if we are filling a gap
for _, fillGap := ms.msgs[nextNonce]; fillGap; _, fillGap = ms.msgs[nextNonce] {
nextNonce++
}
case strict && m.Message.Nonce > nextNonce+MaxNonceGap:
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
case m.Message.Nonce > nextNonce:
nonceGap = true
} }
exms, has := ms.msgs[m.Message.Nonce] exms, has := ms.msgs[m.Message.Nonce]
if has { if has {
// refuse RBF if we have a gap
if strict && nonceGap {
return false, xerrors.Errorf("rejecting replace by fee because of nonce gap (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
}
if m.Cid() != exms.Cid() { if m.Cid() != exms.Cid() {
// check if RBF passes // check if RBF passes
minPrice := exms.Message.GasPremium minPrice := exms.Message.GasPremium
@ -159,17 +182,26 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium, m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium,
ErrRBFTooLowPremium) ErrRBFTooLowPremium)
} }
} else {
return false, xerrors.Errorf("message from %s with nonce %d already in mpool: %w",
m.Message.From, m.Message.Nonce, ErrSoftValidationFailure)
} }
ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int) ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) //ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
} }
if !has && limit && len(ms.msgs) > MaxActorPendingMessages { if !has && strict && len(ms.msgs) > MaxActorPendingMessages {
log.Errorf("too many pending messages from actor %s", m.Message.From) log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages return false, ErrTooManyPendingMessages
} }
if strict && nonceGap {
log.Warnf("adding nonce-gapped message from %s (nonce: %d, nextNonce: %d)",
m.Message.From, m.Message.Nonce, nextNonce)
}
ms.nextNonce = nextNonce
ms.msgs[m.Message.Nonce] = m ms.msgs[m.Message.Nonce] = m
ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int) ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int) //ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int)
@ -177,12 +209,38 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool
return !has, nil return !has, nil
} }
func (ms *msgSet) rm(nonce uint64) { func (ms *msgSet) rm(nonce uint64, applied bool) {
m, has := ms.msgs[nonce] m, has := ms.msgs[nonce]
if has { if !has {
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int) if applied && nonce >= ms.nextNonce {
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int) // we removed a message we did not know about because it was applied
delete(ms.msgs, nonce) // we need to adjust the nonce and check if we filled a gap
ms.nextNonce = nonce + 1
for _, fillGap := ms.msgs[ms.nextNonce]; fillGap; _, fillGap = ms.msgs[ms.nextNonce] {
ms.nextNonce++
}
}
return
}
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
delete(ms.msgs, nonce)
// adjust next nonce
if applied {
// we removed a (known) message because it was applied in a tipset
// we can't possibly have filled a gap in this case
if nonce >= ms.nextNonce {
ms.nextNonce = nonce + 1
}
return
}
// we removed a message because it was pruned
// we have to adjust the nonce if it creates a gap or rewinds state
if nonce < ms.nextNonce {
ms.nextNonce = nonce
} }
} }
@ -476,6 +534,40 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
return mp.addLocked(m, true) return mp.addLocked(m, true)
} }
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
err := mp.checkMessage(m)
if err != nil {
return err
}
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
curTs := mp.curTs
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}
if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}
mp.lk.Lock()
defer mp.lk.Unlock()
if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil {
return err
}
if err := mp.checkBalance(m, curTs); err != nil {
return err
}
return mp.addLocked(m, false)
}
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
@ -483,7 +575,7 @@ func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
return mp.addLocked(m, false) return mp.addLocked(m, false)
} }
func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error { func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
if m.Signature.Type == crypto.SigTypeBLS { if m.Signature.Type == crypto.SigTypeBLS {
mp.blsSigCache.Add(m.Cid(), m.Signature) mp.blsSigCache.Add(m.Cid(), m.Signature)
@ -501,11 +593,16 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error {
mset, ok := mp.pending[m.Message.From] mset, ok := mp.pending[m.Message.From]
if !ok { if !ok {
mset = newMsgSet() nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
if err != nil {
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
}
mset = newMsgSet(nonce)
mp.pending[m.Message.From] = mset mp.pending[m.Message.From] = mset
} }
incr, err := mset.add(m, mp, limit) incr, err := mset.add(m, mp, strict)
if err != nil { if err != nil {
log.Info(err) log.Info(err)
return err return err
@ -664,14 +761,14 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
} }
func (mp *MessagePool) Remove(from address.Address, nonce uint64) { func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock() mp.lk.Lock()
defer mp.lk.Unlock() defer mp.lk.Unlock()
mp.remove(from, nonce) mp.remove(from, nonce, applied)
} }
func (mp *MessagePool) remove(from address.Address, nonce uint64) { func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) {
mset, ok := mp.pending[from] mset, ok := mp.pending[from]
if !ok { if !ok {
return return
@ -688,22 +785,10 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) {
// NB: This deletes any message with the given nonce. This makes sense // NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce // as two messages with the same sender cannot have the same nonce
mset.rm(nonce) mset.rm(nonce, applied)
if len(mset.msgs) == 0 { if len(mset.msgs) == 0 {
delete(mp.pending, from) delete(mp.pending, from)
} else {
var max uint64
for nonce := range mset.msgs {
if max < nonce {
max = nonce
}
}
if max < nonce {
max = nonce // we could have not seen the removed message before
}
mset.nextNonce = max + 1
} }
} }
@ -771,7 +856,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
rm := func(from address.Address, nonce uint64) { rm := func(from address.Address, nonce uint64) {
s, ok := rmsgs[from] s, ok := rmsgs[from]
if !ok { if !ok {
mp.Remove(from, nonce) mp.Remove(from, nonce, true)
return return
} }
@ -780,7 +865,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
return return
} }
mp.Remove(from, nonce) mp.Remove(from, nonce, true)
} }
maybeRepub := func(cid cid.Cid) { maybeRepub := func(cid cid.Cid) {
@ -1082,7 +1167,7 @@ func (mp *MessagePool) loadLocal() error {
return xerrors.Errorf("unmarshaling local message: %w", err) return xerrors.Errorf("unmarshaling local message: %w", err)
} }
if err := mp.Add(&sm); err != nil { if err := mp.addLoaded(&sm); err != nil {
if xerrors.Is(err, ErrNonceTooLow) { if xerrors.Is(err, ErrNonceTooLow) {
continue // todo: drop the message from local cache (if above certain confidence threshold) continue // todo: drop the message from local cache (if above certain confidence threshold)
} }

View File

@ -352,6 +352,12 @@ func TestRevertMessages(t *testing.T) {
} }
func TestPruningSimple(t *testing.T) { func TestPruningSimple(t *testing.T) {
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
MaxNonceGap = oldMaxNonceGap
}()
tma := newTestMpoolAPI() tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore()) w, err := wallet.NewWallet(wallet.NewMemKeyStore())

View File

@ -98,7 +98,7 @@ keepLoop:
// and remove all messages that are still in pruneMsgs after processing the chains // and remove all messages that are still in pruneMsgs after processing the chains
log.Infof("Pruning %d messages", len(pruneMsgs)) log.Infof("Pruning %d messages", len(pruneMsgs))
for _, m := range pruneMsgs { for _, m := range pruneMsgs {
mp.remove(m.Message.From, m.Message.Nonce) mp.remove(m.Message.From, m.Message.Nonce, false)
} }
return nil return nil

View File

@ -3,6 +3,7 @@ package messagepool
import ( import (
"context" "context"
"sort" "sort"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -15,6 +16,8 @@ import (
const repubMsgLimit = 30 const repubMsgLimit = 30
var RepublishBatchDelay = 200 * time.Millisecond
func (mp *MessagePool) republishPendingMessages() error { func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Lock() mp.curTsLk.Lock()
ts := mp.curTs ts := mp.curTs
@ -131,6 +134,12 @@ func (mp *MessagePool) republishPendingMessages() error {
} }
count++ count++
if count < len(msgs) {
// this delay is here to encourage the pubsub subsystem to process the messages serially
// and avoid creating nonce gaps because of concurrent validation.
time.Sleep(RepublishBatchDelay)
}
} }
// track most recently republished messages // track most recently republished messages

View File

@ -12,6 +12,12 @@ import (
) )
func TestRepubMessages(t *testing.T) { func TestRepubMessages(t *testing.T) {
oldRepublishBatchDelay := RepublishBatchDelay
RepublishBatchDelay = time.Microsecond
defer func() {
RepublishBatchDelay = oldRepublishBatchDelay
}()
tma := newTestMpoolAPI() tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore() ds := datastore.NewMapDatastore()

View File

@ -369,6 +369,12 @@ func TestMessageChainSkipping(t *testing.T) {
} }
func TestBasicMessageSelection(t *testing.T) { func TestBasicMessageSelection(t *testing.T) {
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
MaxNonceGap = oldMaxNonceGap
}()
mp, tma := makeTestMpool() mp, tma := makeTestMpool()
// the actors // the actors

View File

@ -555,6 +555,8 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
fallthrough fallthrough
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages): case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
fallthrough fallthrough
case xerrors.Is(err, messagepool.ErrNonceGap):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow): case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
default: default: