Make mpool select only profitable messages

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-08-07 00:40:25 +02:00
parent 79b701d347
commit 2b2b632cd6
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
6 changed files with 53 additions and 21 deletions

View File

@ -154,6 +154,7 @@ type Provider interface {
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
} }
type mpoolProvider struct { type mpoolProvider struct {
@ -162,7 +163,7 @@ type mpoolProvider struct {
} }
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
return &mpoolProvider{sm, ps} return &mpoolProvider{sm: sm, ps: ps}
} }
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
@ -199,6 +200,14 @@ func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
return mpp.sm.ChainStore().LoadTipSet(tsk) return mpp.sm.ChainStore().LoadTipSet(tsk)
} }
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
if err != nil {
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
}
return baseFee, nil
}
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) { func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize) cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize) verifcache, _ := lru.New2Q(build.VerifSigCacheSize)

View File

@ -140,6 +140,10 @@ func (tma *testMpoolAPI) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
return nil, fmt.Errorf("tipset not found") return nil, fmt.Errorf("tipset not found")
} }
func (tma *testMpoolAPI) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
return types.NewInt(100), nil
}
func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) { func assertNonce(t *testing.T, mp *MessagePool, addr address.Address, val uint64) {
t.Helper() t.Helper()
n, err := mp.GetNonce(addr) n, err := mp.GetNonce(addr)

View File

@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
) )
func (mp *MessagePool) pruneExcessMessages() error { func (mp *MessagePool) pruneExcessMessages() error {
@ -30,6 +31,11 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
log.Infof("message pruning took %s", time.Since(start)) log.Infof("message pruning took %s", time.Since(start))
}() }()
baseFee, err := mp.api.ChainComputeBaseFee(ctx, ts)
if err != nil {
return xerrors.Errorf("computing basefee: %w", err)
}
pending, _ := mp.getPendingMessages(ts, ts) pending, _ := mp.getPendingMessages(ts, ts)
// Collect all messages to track which ones to remove and create chains for block inclusion // Collect all messages to track which ones to remove and create chains for block inclusion
@ -39,7 +45,7 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
for _, m := range mset { for _, m := range mset {
pruneMsgs[m.Message.Cid()] = m pruneMsgs[m.Message.Cid()] = m
} }
actorChains := mp.createMessageChains(actor, mset, ts) actorChains := mp.createMessageChains(actor, mset, baseFee, ts)
chains = append(chains, actorChains...) chains = append(chains, actorChains...)
} }

View File

@ -1,6 +1,7 @@
package messagepool package messagepool
import ( import (
"context"
"math/big" "math/big"
"sort" "sort"
"time" "time"
@ -44,6 +45,11 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
log.Infof("message selection took %s", time.Since(start)) log.Infof("message selection took %s", time.Since(start))
}() }()
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
if err != nil {
return nil, xerrors.Errorf("computing basefee: %w", err)
}
// 0. Load messages for the target tipset; if it is the same as the current tipset in the mpool // 0. Load messages for the target tipset; if it is the same as the current tipset in the mpool
// then this is just the pending messages // then this is just the pending messages
pending, err := mp.getPendingMessages(curTs, ts) pending, err := mp.getPendingMessages(curTs, ts)
@ -54,7 +60,7 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed // 1. Create a list of dependent message chains with maximal gas reward per limit consumed
var chains []*msgChain var chains []*msgChain
for actor, mset := range pending { for actor, mset := range pending {
next := mp.createMessageChains(actor, mset, ts) next := mp.createMessageChains(actor, mset, baseFee, ts)
chains = append(chains, next...) chains = append(chains, next...)
} }
@ -62,6 +68,10 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
sort.Slice(chains, func(i, j int) bool { sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j]) return chains[i].Before(chains[j])
}) })
if len(chains) != 0 && chains[0].gasPerf < 0 {
log.Warnw("all messages in mpool have negative has performance", "bestGasPerf", chains[0].gasPerf)
return nil, nil
}
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to // 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
// the block gas limit. // the block gas limit.
@ -91,7 +101,7 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
tailLoop: tailLoop:
for gasLimit >= minGas && last < len(chains) { for gasLimit >= minGas && last < len(chains) {
// trim // trim
chains[last].Trim(gasLimit, mp, ts) chains[last].Trim(gasLimit, mp, baseFee, ts)
// push down if it hasn't been invalidated // push down if it hasn't been invalidated
if chains[last].valid { if chains[last].valid {
@ -231,8 +241,12 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.
} }
} }
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, ts *types.TipSet) *big.Int { func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int {
gasReward := abig.Mul(msg.Message.GasPremium, types.NewInt(uint64(msg.Message.GasLimit))) gasReward := abig.Mul(msg.Message.GasPremium, types.NewInt(uint64(msg.Message.GasLimit)))
maxReward := types.BigSub(msg.Message.GasFeeCap, baseFee)
if types.BigCmp(maxReward, gasReward) < 0 {
gasReward = maxReward
}
return gasReward.Int return gasReward.Int
} }
@ -245,7 +259,7 @@ func (mp *MessagePool) getGasPerf(gasReward *big.Int, gasLimit int64) float64 {
return r return r
} }
func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint64]*types.SignedMessage, ts *types.TipSet) []*msgChain { func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) []*msgChain {
// collect all messages // collect all messages
msgs := make([]*types.SignedMessage, 0, len(mset)) msgs := make([]*types.SignedMessage, 0, len(mset))
for _, m := range mset { for _, m := range mset {
@ -307,7 +321,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
balance = new(big.Int).Sub(balance, value) balance = new(big.Int).Sub(balance, value)
} }
gasReward := mp.getGasReward(m, ts) gasReward := mp.getGasReward(m, baseFee, ts)
rewards = append(rewards, gasReward) rewards = append(rewards, gasReward)
} }
@ -404,11 +418,11 @@ func (mc *msgChain) Before(other *msgChain) bool {
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) (mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
} }
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, ts *types.TipSet) { func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, ts *types.TipSet) {
i := len(mc.msgs) - 1 i := len(mc.msgs) - 1
for i >= 0 && mc.gasLimit > gasLimit { for i >= 0 && mc.gasLimit > gasLimit {
gasLimit -= mc.msgs[i].Message.GasLimit gasLimit -= mc.msgs[i].Message.GasLimit
gasReward := mp.getGasReward(mc.msgs[i], ts) gasReward := mp.getGasReward(mc.msgs[i], baseFee, ts)
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
mc.gasLimit -= mc.msgs[i].Message.GasLimit mc.gasLimit -= mc.msgs[i].Message.GasLimit
if mc.gasLimit > 0 { if mc.gasLimit > 0 {

View File

@ -26,7 +26,7 @@ func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, g
Value: types.FromFil(0), Value: types.FromFil(0),
Nonce: nonce, Nonce: nonce,
GasLimit: gasLimit, GasLimit: gasLimit,
GasFeeCap: types.NewInt(gasPrice), GasFeeCap: types.NewInt(100 + gasPrice),
GasPremium: types.NewInt(gasPrice), GasPremium: types.NewInt(gasPrice),
} }
sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes()) sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes())
@ -90,8 +90,9 @@ func TestMessageChains(t *testing.T) {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
mset[uint64(i)] = m mset[uint64(i)] = m
} }
baseFee := types.NewInt(0)
chains := mp.createMessageChains(a1, mset, ts) chains := mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 1 { if len(chains) != 1 {
t.Fatal("expected a single chain") t.Fatal("expected a single chain")
} }
@ -112,7 +113,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = m mset[uint64(i)] = m
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 10 { if len(chains) != 10 {
t.Fatal("expected 10 chains") t.Fatal("expected 10 chains")
} }
@ -136,7 +137,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = m mset[uint64(i)] = m
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 2 { if len(chains) != 2 {
t.Fatal("expected 1 chain") t.Fatal("expected 1 chain")
} }
@ -167,7 +168,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = m mset[uint64(i)] = m
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 4 { if len(chains) != 4 {
t.Fatal("expected 4 chains") t.Fatal("expected 4 chains")
} }
@ -200,7 +201,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = m mset[uint64(i)] = m
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 1 { if len(chains) != 1 {
t.Fatal("expected a single chain") t.Fatal("expected a single chain")
} }
@ -226,7 +227,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = m mset[uint64(i)] = m
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 1 { if len(chains) != 1 {
t.Fatal("expected a single chain") t.Fatal("expected a single chain")
} }
@ -249,7 +250,7 @@ func TestMessageChains(t *testing.T) {
mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 1 { if len(chains) != 1 {
t.Fatal("expected a single chain") t.Fatal("expected a single chain")
} }
@ -263,14 +264,14 @@ func TestMessageChains(t *testing.T) {
} }
// test5: insufficient balance for all messages // test5: insufficient balance for all messages
tma.setBalanceRaw(a1, types.NewInt(uint64(3*gasLimit+1))) tma.setBalanceRaw(a1, types.NewInt(uint64((300)*gasLimit+1)))
mset = make(map[uint64]*types.SignedMessage) mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
} }
chains = mp.createMessageChains(a1, mset, ts) chains = mp.createMessageChains(a1, mset, baseFee, ts)
if len(chains) != 1 { if len(chains) != 1 {
t.Fatalf("expected a single chain: got %d", len(chains)) t.Fatalf("expected a single chain: got %d", len(chains))
} }

View File

@ -34,8 +34,6 @@ type Message struct {
Value abi.TokenAmount Value abi.TokenAmount
// TODO: remove
// TODO: remove
GasLimit int64 GasLimit int64
GasFeeCap abi.TokenAmount GasFeeCap abi.TokenAmount
GasPremium abi.TokenAmount GasPremium abi.TokenAmount