Merge pull request #2896 from filecoin-project/feat/mpool-config-priority

message pool config and priority addresses
This commit is contained in:
Łukasz Magiera 2020-08-07 19:32:27 +02:00 committed by GitHub
commit 54a6d58c2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 293 additions and 47 deletions

View File

@ -173,6 +173,11 @@ type FullNode interface {
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
// MpoolGetConfig returns (a copy of) the current mpool config
MpoolGetConfig(context.Context) (*types.MpoolConfig, error)
// MpoolSetConfig sets the mpool config to (a copy of) the supplied config
MpoolSetConfig(context.Context, *types.MpoolConfig) error
// MethodGroup: Miner
MinerGetBaseInfo(context.Context, address.Address, abi.ChainEpoch, types.TipSetKey) (*MiningBaseInfo, error)

View File

@ -97,6 +97,8 @@ type FullNodeStruct struct {
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`
MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"`
MpoolSetConfig func(context.Context, *types.MpoolConfig) error `perm:"write"`
MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
@ -445,6 +447,14 @@ func (c *FullNodeStruct) GasEstimateGasLimit(ctx context.Context, msg *types.Mes
return c.Internal.GasEstimateGasLimit(ctx, msg, tsk)
}
func (c *FullNodeStruct) MpoolGetConfig(ctx context.Context) (*types.MpoolConfig, error) {
return c.Internal.MpoolGetConfig(ctx)
}
func (c *FullNodeStruct) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) error {
return c.Internal.MpoolSetConfig(ctx, cfg)
}
func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolSelect(ctx, tsk)
}

View File

@ -0,0 +1,73 @@
package messagepool
import (
"encoding/json"
"time"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore"
)
var (
ReplaceByFeeRatioDefault = 1.25
MemPoolSizeLimitHiDefault = 30000
MemPoolSizeLimitLoDefault = 20000
PruneCooldownDefault = time.Minute
ConfigKey = datastore.NewKey("/mpool/config")
)
func loadConfig(ds dtypes.MetadataDS) (*types.MpoolConfig, error) {
haveCfg, err := ds.Has(ConfigKey)
if err != nil {
return nil, err
}
if !haveCfg {
return DefaultConfig(), nil
}
cfgBytes, err := ds.Get(ConfigKey)
if err != nil {
return nil, err
}
cfg := new(types.MpoolConfig)
err = json.Unmarshal(cfgBytes, cfg)
return cfg, err
}
func saveConfig(cfg *types.MpoolConfig, ds dtypes.MetadataDS) error {
cfgBytes, err := json.Marshal(cfg)
if err != nil {
return err
}
return ds.Put(ConfigKey, cfgBytes)
}
func (mp *MessagePool) GetConfig() *types.MpoolConfig {
mp.cfgLk.Lock()
defer mp.cfgLk.Unlock()
return mp.cfg.Clone()
}
func (mp *MessagePool) SetConfig(cfg *types.MpoolConfig) {
cfg = cfg.Clone()
mp.cfgLk.Lock()
mp.cfg = cfg
mp.rbfNum = types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom))
err := saveConfig(cfg, mp.ds)
if err != nil {
log.Warnf("error persisting mpool config: %s", err)
}
mp.cfgLk.Unlock()
}
func DefaultConfig() *types.MpoolConfig {
return &types.MpoolConfig{
SizeLimitHigh: MemPoolSizeLimitHiDefault,
SizeLimitLow: MemPoolSizeLimitLoDefault,
ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
PruneCooldown: PruneCooldownDefault,
}
}

View File

@ -40,17 +40,9 @@ var log = logging.Logger("messagepool")
const futureDebug = false
const ReplaceByFeeRatio = 1.25
const repubMsgLimit = 5
var MemPoolSizeLimitHiDefault = 30000
var MemPoolSizeLimitLoDefault = 20000
var (
rbfNum = types.NewInt(uint64((ReplaceByFeeRatio - 1) * 256))
rbfDenom = types.NewInt(256)
)
const RbfDenom = 256
var (
ErrMessageTooBig = errors.New("message too big")
@ -75,6 +67,8 @@ const (
type MessagePool struct {
lk sync.Mutex
ds dtypes.MetadataDS
closer chan struct{}
repubTk *clock.Ticker
@ -85,17 +79,23 @@ type MessagePool struct {
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet
cfgLk sync.Mutex
cfg *types.MpoolConfig
rbfNum, rbfDenom types.BigInt
api Provider
minGasPrice types.BigInt
currentSize int
maxTxPoolSizeHi int
maxTxPoolSizeLo int
currentSize int
// pruneTrigger is a channel used to trigger a mempool pruning
pruneTrigger chan struct{}
// pruneCooldown is a channel used to allow a cooldown time between prunes
pruneCooldown chan struct{}
blsSigCache *lru.TwoQueueCache
changes *lps.PubSub
@ -118,7 +118,7 @@ func newMsgSet() *msgSet {
}
}
func (ms *msgSet) add(m *types.SignedMessage) (bool, error) {
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
ms.nextNonce = m.Message.Nonce + 1
}
@ -127,7 +127,7 @@ func (ms *msgSet) add(m *types.SignedMessage) (bool, error) {
if m.Cid() != exms.Cid() {
// check if RBF passes
minPrice := exms.Message.GasPremium
minPrice = types.BigAdd(minPrice, types.BigDiv(types.BigMul(minPrice, rbfNum), rbfDenom))
minPrice = types.BigAdd(minPrice, types.BigDiv(types.BigMul(minPrice, mp.rbfNum), mp.rbfDenom))
minPrice = types.BigAdd(minPrice, types.NewInt(1))
if types.BigCmp(m.Message.GasPremium, minPrice) >= 0 {
log.Infow("add with RBF", "oldpremium", exms.Message.GasPremium,
@ -212,23 +212,36 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
mp := &MessagePool{
closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
maxTxPoolSizeHi: MemPoolSizeLimitHiDefault,
maxTxPoolSizeLo: MemPoolSizeLimitLoDefault,
pruneTrigger: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
netName: netName,
cfg, err := loadConfig(ds)
if err != nil {
if err != nil {
return nil, xerrors.Errorf("error loading mpool config: %w", err)
}
}
mp := &MessagePool{
ds: ds,
closer: make(chan struct{}),
repubTk: build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * 10 * time.Second),
localAddrs: make(map[address.Address]struct{}),
pending: make(map[address.Address]*msgSet),
minGasPrice: types.NewInt(0),
pruneTrigger: make(chan struct{}, 1),
pruneCooldown: make(chan struct{}, 1),
blsSigCache: cache,
sigValCache: verifcache,
changes: lps.New(50),
localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)),
api: api,
netName: netName,
cfg: cfg,
rbfNum: types.NewInt(uint64((cfg.ReplaceByFeeRatio - 1) * RbfDenom)),
rbfDenom: types.NewInt(RbfDenom),
}
// enable initial prunes
mp.pruneCooldown <- struct{}{}
if err := mp.loadLocal(); err != nil {
log.Errorf("loading local messages: %+v", err)
}
@ -252,10 +265,12 @@ func (mp *MessagePool) Close() error {
}
func (mp *MessagePool) Prune() {
//so, its a single slot buffered channel. The first send fills the channel,
//the second send goes through when the pruning starts,
//and the third send goes through (and noops) after the pruning finishes
//and goes through the loop again
// this magic incantation of triggering prune thrice is here to make the Prune method
// synchronous:
// so, its a single slot buffered channel. The first send fills the channel,
// the second send goes through when the pruning starts,
// and the third send goes through (and noops) after the pruning finishes
// and goes through the loop again
mp.pruneTrigger <- struct{}{}
mp.pruneTrigger <- struct{}{}
mp.pruneTrigger <- struct{}{}
@ -499,7 +514,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.pending[m.Message.From] = mset
}
incr, err := mset.add(m)
incr, err := mset.add(m, mp)
if err != nil {
log.Info(err)
return err // TODO(review): this error return was dropped at some point, was it on purpose?
@ -507,7 +522,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
if incr {
mp.currentSize++
if mp.currentSize > mp.maxTxPoolSizeHi {
if mp.currentSize > mp.cfg.SizeLimitHigh {
// send signal to prune messages if it hasnt already been sent
select {
case mp.pruneTrigger <- struct{}{}:

View File

@ -298,8 +298,8 @@ func TestPruningSimple(t *testing.T) {
}
}
mp.maxTxPoolSizeHi = 40
mp.maxTxPoolSizeLo = 10
mp.cfg.SizeLimitHigh = 40
mp.cfg.SizeLimitLow = 10
mp.Prune()

View File

@ -5,6 +5,7 @@ import (
"sort"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -18,11 +19,21 @@ func (mp *MessagePool) pruneExcessMessages() error {
mp.lk.Lock()
defer mp.lk.Unlock()
if mp.currentSize < mp.maxTxPoolSizeHi {
if mp.currentSize < mp.cfg.SizeLimitHigh {
return nil
}
return mp.pruneMessages(context.TODO(), ts)
select {
case <-mp.pruneCooldown:
err := mp.pruneMessages(context.TODO(), ts)
go func() {
time.Sleep(mp.cfg.PruneCooldown)
mp.pruneCooldown <- struct{}{}
}()
return err
default:
return xerrors.New("cannot prune before cooldown")
}
}
func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error {
@ -38,10 +49,26 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
pending, _ := mp.getPendingMessages(ts, ts)
// priority actors -- not pruned
priority := make(map[address.Address]struct{})
for _, actor := range mp.cfg.PriorityAddrs {
priority[actor] = struct{}{}
}
// Collect all messages to track which ones to remove and create chains for block inclusion
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
keepCount := 0
var chains []*msgChain
for actor, mset := range pending {
// we never prune priority actors
_, keep := priority[actor]
if keep {
keepCount += len(mset)
continue
}
// not a priority actor, track the messages and create chains
for _, m := range mset {
pruneMsgs[m.Message.Cid()] = m
}
@ -55,11 +82,11 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
})
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
keepCount := 0
loWaterMark := mp.cfg.SizeLimitLow
keepLoop:
for _, chain := range chains {
for _, m := range chain.msgs {
if keepCount < MemPoolSizeLimitLoDefault {
if keepCount < loWaterMark {
delete(pruneMsgs, m.Message.Cid())
keepCount++
} else {

View File

@ -62,6 +62,15 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
log.Infof("message selection took %s", time.Since(start))
}()
// 0b. Select all priority messages that fit in the block
minGas := int64(gasguess.MinGas)
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
// have we filled the block?
if gasLimit < minGas {
return result, nil
}
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
var chains []*msgChain
for actor, mset := range pending {
@ -81,9 +90,6 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
// the block gas limit.
result := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
last := len(chains)
for i, chain := range chains {
// does it fit in the block?
@ -112,7 +118,7 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim
chains[last].Trim(gasLimit, mp, baseFee, ts)
chains[last].Trim(gasLimit, mp, baseFee, ts, false)
// push down if it hasn't been invalidated
if chains[last].valid {
@ -155,6 +161,86 @@ tailLoop:
return result, nil
}
func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) ([]*types.SignedMessage, int64) {
result := make([]*types.SignedMessage, 0, mp.cfg.SizeLimitLow)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
// 1. Get priority actor chains
var chains []*msgChain
priority := mp.cfg.PriorityAddrs
for _, actor := range priority {
mset, ok := pending[actor]
if ok {
// remove actor from pending set as we are already processed these messages
delete(pending, actor)
// create chains for the priority actor
next := mp.createMessageChains(actor, mset, baseFee, ts)
chains = append(chains, next...)
}
}
// 2. Sort the chains
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
// 3. Merge chains until the block limit; we are willing to include negative performing chains
// as these are messages from our own miners
last := len(chains)
for i, chain := range chains {
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// we can't fit this chain because of block gasLimit -- we are at the edge
last = i
break
}
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim, without discarding negative performing messages
chains[last].Trim(gasLimit, mp, baseFee, ts, true)
// push down if it hasn't been invalidated
if chains[last].valid {
for i := last; i < len(chains)-1; i++ {
if chains[i].Before(chains[i+1]) {
break
}
chains[i], chains[i+1] = chains[i+1], chains[i]
}
}
// select the next (valid and fitting) chain for inclusion
for i, chain := range chains[last:] {
// has the chain been invalidated
if !chain.valid {
continue
}
// does it fit in the bock?
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// this chain needs to be trimmed
last += i
continue tailLoop
}
// the merge loop ended after processing all the chains and we probably still have gas to spare
// -- mark the end.
last = len(chains)
}
return result, gasLimit
}
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
result := make(map[address.Address]map[uint64]*types.SignedMessage)
haveCids := make(map[cid.Cid]struct{})
@ -435,9 +521,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
}
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, ts *types.TipSet) {
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, ts *types.TipSet, priority bool) {
i := len(mc.msgs) - 1
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
for i >= 0 && (mc.gasLimit > gasLimit || (!priority && mc.gasPerf < 0)) {
gasLimit -= mc.msgs[i].Message.GasLimit
gasReward := mp.getGasReward(mc.msgs[i], baseFee, ts)
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)

21
chain/types/mpool.go Normal file
View File

@ -0,0 +1,21 @@
package types
import (
"time"
"github.com/filecoin-project/go-address"
)
type MpoolConfig struct {
PriorityAddrs []address.Address
SizeLimitHigh int
SizeLimitLow int
ReplaceByFeeRatio float64
PruneCooldown time.Duration
}
func (mc *MpoolConfig) Clone() *MpoolConfig {
r := new(MpoolConfig)
*r = *mc
return r
}

View File

@ -25,6 +25,15 @@ type MpoolAPI struct {
Mpool *messagepool.MessagePool
}
func (a *MpoolAPI) MpoolGetConfig(context.Context) (*types.MpoolConfig, error) {
return a.Mpool.GetConfig(), nil
}
func (a *MpoolAPI) MpoolSetConfig(ctx context.Context, cfg *types.MpoolConfig) error {
a.Mpool.SetConfig(cfg)
return nil
}
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {