WIP: message pool pruning

This commit is contained in:
whyrusleeping 2020-07-16 15:28:35 -07:00
parent 113681fcc4
commit fe80f4b830
2 changed files with 84 additions and 19 deletions

View File

@ -44,6 +44,9 @@ const ReplaceByFeeRatio = 1.25
const repubMsgLimit = 5 const repubMsgLimit = 5
var MemPoolSizeLimitHiDefault = 50000
var MemPoolSizeLimitLoDefault = 40000
var ( var (
rbfNum = types.NewInt(uint64((ReplaceByFeeRatio - 1) * 256)) rbfNum = types.NewInt(uint64((ReplaceByFeeRatio - 1) * 256))
rbfDenom = types.NewInt(256) rbfDenom = types.NewInt(256)
@ -86,7 +89,12 @@ type MessagePool struct {
minGasPrice types.BigInt minGasPrice types.BigInt
maxTxPoolSize int currentSize int
maxTxPoolSizeHi int
maxTxPoolSizeLo int
// pruneTrigger is a channel used to trigger a mempool pruning
pruneTrigger chan struct{}
blsSigCache *lru.TwoQueueCache blsSigCache *lru.TwoQueueCache
@ -110,7 +118,7 @@ func newMsgSet() *msgSet {
} }
} }
func (ms *msgSet) add(m *types.SignedMessage) error { func (ms *msgSet) add(m *types.SignedMessage) (bool, error) {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce { if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
ms.nextNonce = m.Message.Nonce + 1 ms.nextNonce = m.Message.Nonce + 1
} }
@ -126,7 +134,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
"newprice", m.Message.GasPrice, "addr", m.Message.From, "nonce", m.Message.Nonce) "newprice", m.Message.GasPrice, "addr", m.Message.From, "nonce", m.Message.Nonce)
} else { } else {
log.Info("add with duplicate nonce") log.Info("add with duplicate nonce")
return xerrors.Errorf("message from %s with nonce %d already in mpool,"+ return false, xerrors.Errorf("message from %s with nonce %d already in mpool,"+
" increase GasPrice to %s from %s to trigger replace by fee", " increase GasPrice to %s from %s to trigger replace by fee",
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPrice) m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPrice)
} }
@ -134,7 +142,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
} }
ms.msgs[m.Message.Nonce] = m ms.msgs[m.Message.Nonce] = m
return nil return !has, nil
} }
type Provider interface { type Provider interface {
@ -196,25 +204,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
verifcache, _ := lru.New2Q(build.VerifSigCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
mp := &MessagePool{ mp := &MessagePool{
closer: make(chan struct{}), closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second), repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}), localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet), pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0), minGasPrice: types.NewInt(0),
maxTxPoolSize: 5000, maxTxPoolSizeHi: MemPoolSizeLimitHiDefault,
blsSigCache: cache, maxTxPoolSizeLo: MemPoolSizeLimitLoDefault,
sigValCache: verifcache, pruneTrigger: make(chan struct{}, 1),
changes: lps.New(50), blsSigCache: cache,
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), sigValCache: verifcache,
api: api, changes: lps.New(50),
netName: netName, localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
netName: netName,
} }
if err := mp.loadLocal(); err != nil { if err := mp.loadLocal(); err != nil {
log.Errorf("loading local messages: %+v", err) log.Errorf("loading local messages: %+v", err)
} }
go mp.repubLocal() go mp.runLoop()
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error { mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app) err := mp.HeadChange(rev, app)
@ -232,7 +242,7 @@ func (mp *MessagePool) Close() error {
return nil return nil
} }
func (mp *MessagePool) repubLocal() { func (mp *MessagePool) runLoop() {
for { for {
select { select {
case <-mp.repubTk.C: case <-mp.repubTk.C:
@ -294,6 +304,10 @@ func (mp *MessagePool) repubLocal() {
if errout != nil { if errout != nil {
log.Errorf("errors while republishing: %+v", errout) log.Errorf("errors while republishing: %+v", errout)
} }
case <-mp.pruneTrigger:
if err := mp.pruneExcessMessages(); err != nil {
log.Errorf("failed to prune excess messages from mempool: %s", err)
}
case <-mp.closer: case <-mp.closer:
mp.repubTk.Stop() mp.repubTk.Stop()
return return
@ -466,8 +480,21 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.pending[m.Message.From] = mset mp.pending[m.Message.From] = mset
} }
if err := mset.add(m); err != nil { incr, err := mset.add(m)
if err != nil {
log.Info(err) log.Info(err)
return err // TODO(review): this error return was dropped at some point, was it on purpose?
}
if incr {
mp.currentSize++
if mp.currentSize > mp.maxTxPoolSizeHi {
// send signal to prune messages if it hasnt already been sent
select {
case mp.pruneTrigger <- struct{}{}:
default:
}
}
} }
mp.changes.Pub(api.MpoolUpdate{ mp.changes.Pub(api.MpoolUpdate{
@ -610,6 +637,8 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
Type: api.MpoolRemove, Type: api.MpoolRemove,
Message: m, Message: m,
}, localUpdates) }, localUpdates)
mp.currentSize--
} }
// NB: This deletes any message with the given nonce. This makes sense // NB: This deletes any message with the given nonce. This makes sense

View File

@ -0,0 +1,36 @@
package messagepool
import (
"time"
"golang.org/x/xerrors"
)
func (mp *MessagePool) pruneExcessMessages() error {
start := time.Now()
defer func() {
log.Infow("message pruning complete", "took", time.Since(start))
}()
mp.lk.Lock()
defer mp.lk.Unlock()
if mp.currentSize < mp.maxTxPoolSizeHi {
return nil
}
pruneCount := mp.currentSize - mp.maxTxPoolSizeLo
// Step 1. Remove all 'future' messages (those with a nonce gap)
npruned, err := mp.pruneFutureMessages()
if err != nil {
return xerrors.Errorf("failed to prune future messages: %w", err)
}
pruneCount -= npruned
if pruneCount <= 0 {
return nil
}
// Step 2. prune messages with the lowest gas prices
}