mpool config

This commit is contained in:
vyzo 2020-08-07 17:33:55 +03:00
parent 70f93da5dd
commit 42e1338ffe
5 changed files with 82 additions and 33 deletions

View File

@ -0,0 +1,33 @@
package messagepool
import (
"github.com/filecoin-project/lotus/chain/types"
)
var (
ReplaceByFeeRatioDefault = 1.25
MemPoolSizeLimitHiDefault = 30000
MemPoolSizeLimitLoDefault = 20000
)
func (mp *MessagePool) GetConfig() *types.MpoolConfig {
mp.cfgLk.Lock()
defer mp.cfgLk.Unlock()
return mp.cfg.Clone()
}
func (mp *MessagePool) SetConfig(cfg *types.MpoolConfig) {
cfg = cfg.Clone()
mp.cfgLk.Lock()
mp.cfg = cfg
mp.rbfNum = types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom))
mp.cfgLk.Unlock()
}
func DefaultConfig() *types.MpoolConfig {
return &types.MpoolConfig{
SizeLimitHigh: MemPoolSizeLimitHiDefault,
SizeLimitLow: MemPoolSizeLimitLoDefault,
ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
}
}

View File

@ -40,17 +40,9 @@ var log = logging.Logger("messagepool")
const futureDebug = false
const ReplaceByFeeRatio = 1.25
const repubMsgLimit = 5
var MemPoolSizeLimitHiDefault = 30000
var MemPoolSizeLimitLoDefault = 20000
var (
rbfNum = types.NewInt(uint64((ReplaceByFeeRatio - 1) * 256))
rbfDenom = types.NewInt(256)
)
const RbfDenom = 256
var (
ErrMessageTooBig = errors.New("message too big")
@ -85,13 +77,16 @@ type MessagePool struct {
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet
cfgLk sync.Mutex
cfg *types.MpoolConfig
rbfNum, rbfDenom types.BigInt
api Provider
minGasPrice types.BigInt
currentSize int
maxTxPoolSizeHi int
maxTxPoolSizeLo int
currentSize int
// pruneTrigger is a channel used to trigger a mempool pruning
pruneTrigger chan struct{}
@ -118,7 +113,7 @@ func newMsgSet() *msgSet {
}
}
func (ms *msgSet) add(m *types.SignedMessage) (bool, error) {
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
ms.nextNonce = m.Message.Nonce + 1
}
@ -127,7 +122,7 @@ func (ms *msgSet) add(m *types.SignedMessage) (bool, error) {
if m.Cid() != exms.Cid() {
// check if RBF passes
minPrice := exms.Message.GasPremium
minPrice = types.BigAdd(minPrice, types.BigDiv(types.BigMul(minPrice, rbfNum), rbfDenom))
minPrice = types.BigAdd(minPrice, types.BigDiv(types.BigMul(minPrice, mp.rbfNum), mp.rbfDenom))
minPrice = types.BigAdd(minPrice, types.NewInt(1))
if types.BigCmp(m.Message.GasPremium, minPrice) >= 0 {
log.Infow("add with RBF", "oldpremium", exms.Message.GasPremium,
@ -211,22 +206,24 @@ func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.Tip
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
cfg := DefaultConfig()
mp := &MessagePool{
closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
maxTxPoolSizeHi: MemPoolSizeLimitHiDefault,
maxTxPoolSizeLo: MemPoolSizeLimitLoDefault,
pruneTrigger: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
netName: netName,
closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
netName: netName,
cfg: cfg,
rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)),
rbfDenom: types.NewInt(RbfDenom),
}
if err := mp.loadLocal(); err != nil {
@ -499,7 +496,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.pending[m.Message.From] = mset
}
incr, err := mset.add(m)
incr, err := mset.add(m, mp)
if err != nil {
log.Info(err)
return err // TODO(review): this error return was dropped at some point, was it on purpose?
@ -507,7 +504,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
if incr {
mp.currentSize++
if mp.currentSize > mp.maxTxPoolSizeHi {
if mp.currentSize > mp.cfg.SizeLimitHigh {
// send signal to prune messages if it hasnt already been sent
select {
case mp.pruneTrigger <- struct{}{}:

View File

@ -18,7 +18,7 @@ func (mp *MessagePool) pruneExcessMessages() error {
mp.lk.Lock()
defer mp.lk.Unlock()
if mp.currentSize < mp.maxTxPoolSizeHi {
if mp.currentSize < mp.cfg.SizeLimitHigh {
return nil
}
@ -55,11 +55,12 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
})
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
loWaterMark := mp.cfg.SizeLimitLow
keepCount := 0
keepLoop:
for _, chain := range chains {
for _, m := range chain.msgs {
if keepCount < MemPoolSizeLimitLoDefault {
if keepCount < loWaterMark {
delete(pruneMsgs, m.Message.Cid())
keepCount++
} else {

View File

@ -81,7 +81,7 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
// the block gas limit.
result := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo)
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
last := len(chains)

18
chain/types/mpool.go Normal file
View File

@ -0,0 +1,18 @@
package types
import (
"github.com/filecoin-project/go-address"
)
type MpoolConfig struct {
PriorityAddrs []address.Address
SizeLimitHigh int
SizeLimitLow int
ReplaceByFeeRatio float64
}
func (mc *MpoolConfig) Clone() *MpoolConfig {
r := new(MpoolConfig)
*r = *mc
return r
}