Merge pull request #2838 from filecoin-project/feat/message-pool-selection

New message pool selection logic
This commit is contained in:
Łukasz Magiera 2020-08-06 20:19:35 +02:00 committed by GitHub
commit e54a87f91b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 967 additions and 729 deletions

View File

@ -147,6 +147,9 @@ type FullNode interface {
// MpoolPending returns pending mempool messages.
MpoolPending(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
// MpoolSelect returns a list of pending messages for inclusion in the next block
MpoolSelect(context.Context, types.TipSetKey) ([]*types.SignedMessage, error)
// MpoolPush pushes a signed message to mempool.
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)

View File

@ -94,6 +94,7 @@ type FullNodeStruct struct {
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`
MpoolSelect func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
@ -436,6 +437,10 @@ func (c *FullNodeStruct) GasEstimateGasLimit(ctx context.Context, msg *types.Mes
return c.Internal.GasEstimateGasLimit(ctx, msg, tsk)
}
func (c *FullNodeStruct) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolSelect(ctx, tsk)
}
func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
return c.Internal.MpoolPending(ctx, tsk)
}

View File

@ -18,6 +18,9 @@ type ActorLookup func(context.Context, address.Address, types.TipSetKey) (*types
const failedGasGuessRatio = 0.5
const failedGasGuessMax = 25_000_000
const MinGas = 1298450
const MaxGas = 1600271356
type CostKey struct {
Code cid.Cid
M abi.MethodNum

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/wallet"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -26,6 +27,7 @@ type testMpoolAPI struct {
bmsgs map[cid.Cid][]*types.SignedMessage
statenonce map[address.Address]uint64
balance map[address.Address]types.BigInt
tipsets []*types.TipSet
}
@ -34,6 +36,7 @@ func newTestMpoolAPI() *testMpoolAPI {
return &testMpoolAPI{
bmsgs: make(map[cid.Cid][]*types.SignedMessage),
statenonce: make(map[address.Address]uint64),
balance: make(map[address.Address]types.BigInt),
}
}
@ -55,6 +58,14 @@ func (tma *testMpoolAPI) setStateNonce(addr address.Address, v uint64) {
tma.statenonce[addr] = v
}
func (tma *testMpoolAPI) setBalance(addr address.Address, v uint64) {
tma.balance[addr] = types.FromFil(v)
}
func (tma *testMpoolAPI) setBalanceRaw(addr address.Address, v types.BigInt) {
tma.balance[addr] = v
}
func (tma *testMpoolAPI) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) {
tma.bmsgs[h.Cid()] = msgs
tma.tipsets = append(tma.tipsets, mock.TipSet(h))
@ -74,9 +85,15 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error {
}
func (tma *testMpoolAPI) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
balance, ok := tma.balance[addr]
if !ok {
balance = types.NewInt(90000000)
tma.balance[addr] = balance
}
return &types.Actor{
Code: builtin.StorageMarketActorCodeID,
Nonce: tma.statenonce[addr],
Balance: types.NewInt(90000000),
Balance: balance,
}, nil
}

View File

@ -1,29 +1,15 @@
package messagepool
import (
"bytes"
"context"
big2 "math/big"
"sort"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
)
func (mp *MessagePool) pruneExcessMessages() error {
start := time.Now()
defer func() {
log.Infow("message pruning complete", "took", time.Since(start))
}()
mp.curTsLk.Lock()
ts := mp.curTs
mp.curTsLk.Unlock()
@ -38,211 +24,48 @@ func (mp *MessagePool) pruneExcessMessages() error {
return mp.pruneMessages(context.TODO(), ts)
}
// just copied from miner/ SelectMessages
func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) error {
al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return mp.api.StateGetActor(addr, ts)
start := time.Now()
defer func() {
log.Infof("message pruning took %s", time.Since(start))
}()
pending, _ := mp.getPendingMessages(ts, ts)
// Collect all messages to track which ones to remove and create chains for block inclusion
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
var chains []*msgChain
for actor, mset := range pending {
for _, m := range mset {
pruneMsgs[m.Message.Cid()] = m
}
actorChains := mp.createMessageChains(actor, mset, ts)
chains = append(chains, actorChains...)
}
msgs := make([]*types.SignedMessage, 0, mp.currentSize)
for a := range mp.pending {
msgs = append(msgs, mp.pendingFor(a)...)
}
type senderMeta struct {
lastReward abi.TokenAmount
lastGasLimit int64
gasReward []abi.TokenAmount
gasLimit []int64
msgs []*types.SignedMessage
}
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]big.Int)
outBySender := make(map[address.Address]*senderMeta)
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := build.Clock.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
guessGasDur := time.Duration(0)
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
// Sort the chains
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
for _, msg := range msgs {
vmstart := build.Clock.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
log.Warnf("invalid message in message pool: %s", err)
continue
}
vmValid += build.Clock.Since(vmstart)
// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
continue
}
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}
from := msg.Message.From
getBalStart := build.Clock.Now()
if _, ok := inclNonces[from]; !ok {
act, err := mp.api.StateGetActor(from, nil)
if err != nil {
log.Warnf("failed to check message sender balance, skipping message: %+v", err)
continue
}
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
getbal += build.Clock.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
tooLowFundMsgs++
// todo: drop from mpool
continue
}
if msg.Message.Nonce > inclNonces[from] {
tooHighNonceMsgs++
continue
}
if msg.Message.Nonce < inclNonces[from] {
continue
}
inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
sm := outBySender[from]
if sm == nil {
sm = &senderMeta{
lastReward: big.Zero(),
}
}
sm.gasLimit = append(sm.gasLimit, sm.lastGasLimit+msg.Message.GasLimit)
sm.lastGasLimit = sm.gasLimit[len(sm.gasLimit)-1]
guessGasStart := build.Clock.Now()
guessedGas, err := gasguess.GuessGasUsed(ctx, types.EmptyTSK, msg, al)
guessGasDur += build.Clock.Since(guessGasStart)
if err != nil {
log.Infow("failed to guess gas", "to", msg.Message.To, "method", msg.Message.Method, "err", err)
}
estimatedReward := big.Mul(types.NewInt(uint64(guessedGas)), msg.Message.GasPrice)
sm.gasReward = append(sm.gasReward, big.Add(sm.lastReward, estimatedReward))
sm.lastReward = sm.gasReward[len(sm.gasReward)-1]
sm.msgs = append(sm.msgs, msg)
outBySender[from] = sm
}
orderedSenders := make([]address.Address, 0, len(outBySender))
for k := range outBySender {
orderedSenders = append(orderedSenders, k)
}
sort.Slice(orderedSenders, func(i, j int) bool {
return bytes.Compare(orderedSenders[i].Bytes(), orderedSenders[j].Bytes()) == -1
})
out := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo)
{
for {
var bestSender address.Address
var nBest int
var bestGasToReward float64
// TODO: This is O(n^2)-ish, could use something like container/heap to cache this math
for _, sender := range orderedSenders {
meta, ok := outBySender[sender]
if !ok {
continue
}
for n := range meta.msgs {
if n+len(out) >= mp.maxTxPoolSizeLo {
break
}
gasToReward, _ := new(big2.Float).SetInt(meta.gasReward[n].Int).Float64()
gasToReward /= float64(meta.gasLimit[n])
if gasToReward >= bestGasToReward {
bestSender = sender
nBest = n + 1
bestGasToReward = gasToReward
}
}
}
if nBest == 0 {
break // block gas limit reached
}
{
out = append(out, outBySender[bestSender].msgs[:nBest]...)
outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:]
outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:]
outBySender[bestSender].gasReward = outBySender[bestSender].gasReward[nBest:]
if len(outBySender[bestSender].msgs) == 0 {
delete(outBySender, bestSender)
}
}
if len(out) >= mp.maxTxPoolSizeLo {
break
// Keep messages (remove them from pruneMsgs) from chains while we are under the low water mark
keepCount := 0
keepLoop:
for _, chain := range chains {
for _, m := range chain.msgs {
if keepCount < MemPoolSizeLimitLoDefault {
delete(pruneMsgs, m.Message.Cid())
keepCount++
} else {
break keepLoop
}
}
}
if tooLowFundMsgs > 0 {
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
}
if tooHighNonceMsgs > 0 {
log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs)
}
sm := build.Clock.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),
"vmvalidate", vmValid,
"getbalance", getbal,
"guessgas", guessGasDur,
"msgs", len(msgs))
}
good := make(map[cid.Cid]bool)
for _, m := range out {
good[m.Cid()] = true
}
for _, m := range msgs {
if !good[m.Cid()] {
mp.remove(m.Message.From, m.Message.Nonce)
}
// and remove all messages that are still in pruneMsgs after processing the chains
log.Infof("Pruning %d messages", len(pruneMsgs))
for _, m := range pruneMsgs {
mp.remove(m.Message.From, m.Message.Nonce)
}
return nil

View File

@ -0,0 +1,455 @@
package messagepool
import (
"context"
"math/big"
"sort"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
abig "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
)
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
type msgChain struct {
msgs []*types.SignedMessage
gasReward *big.Int
gasLimit int64
gasPerf float64
valid bool
next *msgChain
}
func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) {
mp.curTsLk.Lock()
curTs := mp.curTs
mp.curTsLk.Unlock()
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.selectMessages(curTs, ts)
}
func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
start := time.Now()
defer func() {
log.Infof("message selection took %s", time.Since(start))
}()
// 0. Load messages for the target tipset; if it is the same as the current tipset in the mpool
// then this is just the pending messages
pending, err := mp.getPendingMessages(curTs, ts)
if err != nil {
return nil, err
}
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
var chains []*msgChain
for actor, mset := range pending {
next := mp.createMessageChains(actor, mset, ts)
chains = append(chains, next...)
}
// 2. Sort the chains
sort.Slice(chains, func(i, j int) bool {
return chains[i].Before(chains[j])
})
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
// the block gas limit.
result := make([]*types.SignedMessage, 0, mp.maxTxPoolSizeLo)
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
last := len(chains)
for i, chain := range chains {
// does it fit in the block?
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// we can't fit this chain because of block gasLimit -- we are at the edge
last = i
break
}
// 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit
// to pack some more chains, then trim the last chain and push it down.
// Trimming invalidates subsequent dependent chains so that they can't be selected as their
// dependency cannot be (fully) included.
// We do this in a loop because the blocker might have been inordinately large and we might
// have to do it multiple times to satisfy tail packing.
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim
chains[last].Trim(gasLimit, mp, ts)
// push down if it hasn't been invalidated
if chains[last].valid {
for i := last; i < len(chains)-1; i++ {
if chains[i].Before(chains[i+1]) {
break
}
chains[i], chains[i+1] = chains[i+1], chains[i]
}
}
// select the next (valid and fitting) chain for inclusion
for i, chain := range chains[last:] {
// has the chain been invalidated
if !chain.valid {
continue
}
// does it fit in the bock?
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
result = append(result, chain.msgs...)
continue
}
// this chain needs to be trimmed
last = i
continue tailLoop
}
// the merge loop ended after processing all the chains and we probably still have gas to spare
// -- mark the end.
last = len(chains)
}
return result, nil
}
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
result := make(map[address.Address]map[uint64]*types.SignedMessage)
haveCids := make(map[cid.Cid]struct{})
// are we in sync?
inSync := false
if curTs.Height() == ts.Height() && curTs.Equals(ts) {
inSync = true
}
// first add our current pending messages
for a, mset := range mp.pending {
if inSync {
// no need to copy the map
result[a] = mset.msgs
} else {
// we need to copy the map to avoid clobbering it as we load more messages
msetCopy := make(map[uint64]*types.SignedMessage, len(mset.msgs))
for nonce, m := range mset.msgs {
msetCopy[nonce] = m
}
result[a] = msetCopy
// mark the messages as seen
for _, m := range mset.msgs {
haveCids[m.Cid()] = struct{}{}
}
}
}
// we are in sync, that's the happy path
if inSync {
return result, nil
}
// nope, we need to sync the tipsets
for {
if curTs.Height() == ts.Height() {
if curTs.Equals(ts) {
return result, nil
}
// different blocks in tipsets -- we mark them as seen so that they are not included in
// in the message set we return, but *neither me (vyzo) nor why understand why*
// this code is also probably completely untested in production, so I am adding a big fat
// warning to revisit this case and sanity check this decision.
log.Warnf("mpool tipset has same height as target tipset but it's not equal; beware of dragons!")
have, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
}
for _, m := range msgs {
if _, have := haveCids[m.Cid()]; have {
continue
}
haveCids[m.Cid()] = struct{}{}
mset, ok := result[m.Message.From]
if !ok {
mset = make(map[uint64]*types.SignedMessage)
result[m.Message.From] = mset
}
other, dupNonce := mset[m.Message.Nonce]
if dupNonce {
// duplicate nonce, selfishly keep the message with the highest GasPrice
// if the gas prices are the same, keep the one with the highest GasLimit
switch m.Message.GasPrice.Int.Cmp(other.Message.GasPrice.Int) {
case 0:
if m.Message.GasLimit > other.Message.GasLimit {
mset[m.Message.Nonce] = m
}
case 1:
mset[m.Message.Nonce] = m
}
} else {
mset[m.Message.Nonce] = m
}
}
if curTs.Height() >= ts.Height() {
return result, nil
}
ts, err = mp.api.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("error loading parent tipset: %w", err)
}
}
}
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, ts *types.TipSet) *big.Int {
al := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return mp.api.StateGetActor(addr, ts)
}
gasUsed, err := gasguess.GuessGasUsed(context.TODO(), types.EmptyTSK, msg, al)
if err != nil {
gasUsed = int64(gasguess.MaxGas)
if gasUsed > msg.Message.GasLimit/2 {
gasUsed = msg.Message.GasLimit / 2
}
// if we start seeing this warning we may have a problem with spammers!
log.Warnf("Cannot guess gas usage for message: %s; using %d", err, gasUsed)
}
gasReward := abig.Mul(msg.Message.GasPrice, types.NewInt(uint64(gasUsed)))
return gasReward.Int
}
func (mp *MessagePool) getGasPerf(gasReward *big.Int, gasLimit int64) float64 {
// gasPerf = gasReward * build.BlockGasLimit / gasLimit
a := new(big.Rat).SetInt(new(big.Int).Mul(gasReward, bigBlockGasLimit))
b := big.NewRat(1, gasLimit)
c := new(big.Rat).Mul(a, b)
r, _ := c.Float64()
return r
}
func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint64]*types.SignedMessage, ts *types.TipSet) []*msgChain {
// collect all messages
msgs := make([]*types.SignedMessage, 0, len(mset))
for _, m := range mset {
msgs = append(msgs, m)
}
// sort by nonce
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
})
// sanity checks:
// - there can be no gaps in nonces, starting from the current actor nonce
// if there is a gap, drop messages after the gap, we can't include them
// - all messages must have minimum gas and the total gas for the candidate messages
// cannot exceed the block limit; drop all messages that exceed the limit
// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
// the balance
a, _ := mp.api.StateGetActor(actor, ts)
curNonce := a.Nonce
balance := a.Balance.Int
gasLimit := int64(0)
i := 0
rewards := make([]*big.Int, 0, len(msgs))
for i = 0; i < len(msgs); i++ {
m := msgs[i]
if m.Message.Nonce < curNonce {
log.Warnf("encountered message from actor %s with nonce (%d) less than the current nonce (%d)",
actor, m.Message.Nonce, curNonce)
continue
}
if m.Message.Nonce != curNonce {
break
}
curNonce++
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(m.ChainLength()).Total()
if m.Message.GasLimit < minGas {
break
}
gasLimit += m.Message.GasLimit
if gasLimit > build.BlockGasLimit {
break
}
required := m.Message.RequiredFunds().Int
if balance.Cmp(required) < 0 {
break
}
balance = new(big.Int).Sub(balance, required)
value := m.Message.Value.Int
if balance.Cmp(value) >= 0 {
// Note: we only account for the value if the balance doesn't drop below 0
// otherwise the message will fail and the miner can reap the gas rewards
balance = new(big.Int).Sub(balance, value)
}
gasReward := mp.getGasReward(m, ts)
rewards = append(rewards, gasReward)
}
// check we have a sane set of messages to construct the chains
if i > 0 {
msgs = msgs[:i]
} else {
return nil
}
// ok, now we can construct the chains using the messages we have
// invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged
// and increase the gasPerf of the first chain
// We do this in two passes:
// - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf
// - in the second pass we merge chains to maintain the invariant.
var chains []*msgChain
var curChain *msgChain
newChain := func(m *types.SignedMessage, i int) *msgChain {
chain := new(msgChain)
chain.msgs = []*types.SignedMessage{m}
chain.gasReward = rewards[i]
chain.gasLimit = m.Message.GasLimit
chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit)
chain.valid = true
return chain
}
// create the individual chains
for i, m := range msgs {
if curChain == nil {
curChain = newChain(m, i)
continue
}
gasReward := new(big.Int).Add(curChain.gasReward, rewards[i])
gasLimit := curChain.gasLimit + m.Message.GasLimit
gasPerf := mp.getGasPerf(gasReward, gasLimit)
// try to add the message to the current chain -- if it decreases the gasPerf, then make a
// new chain
if gasPerf < curChain.gasPerf {
chains = append(chains, curChain)
curChain = newChain(m, i)
} else {
curChain.msgs = append(curChain.msgs, m)
curChain.gasReward = gasReward
curChain.gasLimit = gasLimit
curChain.gasPerf = gasPerf
}
}
chains = append(chains, curChain)
// merge chains to maintain the invariant
for {
merged := 0
for i := len(chains) - 1; i > 0; i-- {
if chains[i].gasPerf >= chains[i-1].gasPerf {
chains[i-1].msgs = append(chains[i-1].msgs, chains[i].msgs...)
chains[i-1].gasReward = new(big.Int).Add(chains[i-1].gasReward, chains[i].gasReward)
chains[i-1].gasLimit += chains[i].gasLimit
chains[i-1].gasPerf = mp.getGasPerf(chains[i-1].gasReward, chains[i-1].gasLimit)
chains[i].valid = false
merged++
}
}
if merged == 0 {
break
}
// drop invalidated chains
newChains := make([]*msgChain, 0, len(chains)-merged)
for _, c := range chains {
if c.valid {
newChains = append(newChains, c)
}
}
chains = newChains
}
// link dependent chains
for i := 0; i < len(chains)-1; i++ {
chains[i].next = chains[i+1]
}
return chains
}
func (mc *msgChain) Before(other *msgChain) bool {
return mc.gasPerf > other.gasPerf ||
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
}
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, ts *types.TipSet) {
i := len(mc.msgs) - 1
for i >= 0 && mc.gasLimit > gasLimit {
gasLimit -= mc.msgs[i].Message.GasLimit
gasReward := mp.getGasReward(mc.msgs[i], ts)
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
mc.gasLimit -= mc.msgs[i].Message.GasLimit
if mc.gasLimit > 0 {
mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit)
} else {
mc.gasPerf = 0
}
i--
}
if i < 0 {
mc.msgs = nil
mc.valid = false
} else {
mc.msgs = mc.msgs[:i+1]
}
if mc.next != nil {
mc.next.invalidate()
mc.next = nil
}
}
func (mc *msgChain) invalidate() {
mc.valid = false
mc.msgs = nil
if mc.next != nil {
mc.next.invalidate()
mc.next = nil
}
}

View File

@ -0,0 +1,429 @@
package messagepool
import (
"context"
"testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-datastore"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
)
func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
msg := &types.Message{
From: from,
To: to,
Method: 2,
Value: types.FromFil(0),
Nonce: nonce,
GasLimit: gasLimit,
GasPrice: types.NewInt(gasPrice),
}
sig, err := w.Sign(context.TODO(), from, msg.Cid().Bytes())
if err != nil {
panic(err)
}
return &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
}
func makeTestMpool() (*MessagePool, *testMpoolAPI) {
tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()
mp, err := New(tma, ds, "test")
if err != nil {
panic(err)
}
return mp, tma
}
func TestMessageChains(t *testing.T) {
mp, tma := makeTestMpool()
// the actors
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
if err != nil {
t.Fatal(err)
}
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
if err != nil {
t.Fatal(err)
}
block := mock.MkBlock(nil, 1, 1)
ts := mock.TipSet(block)
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
tma.setBalance(a1, 1) // in FIL
// test chain aggregations
// test1: 10 messages from a1 to a2, with increasing gasPerf; it should
// make a single chain with 10 messages given enough balance
mset := make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
mset[uint64(i)] = m
}
chains := mp.createMessageChains(a1, mset, ts)
if len(chains) != 1 {
t.Fatal("expected a single chain")
}
if len(chains[0].msgs) != 10 {
t.Fatalf("expected 10 messages in the chain but got %d", len(chains[0].msgs))
}
for i, m := range chains[0].msgs {
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
// test2 : 10 messages from a1 to a2, with decreasing gasPerf; it should
// make 10 chains with 1 message each
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(10-i))
mset[uint64(i)] = m
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 10 {
t.Fatal("expected 10 chains")
}
for i, chain := range chains {
if len(chain.msgs) != 1 {
t.Fatalf("expected 1 message in chain %d but got %d", i, len(chain.msgs))
}
}
for i, chain := range chains {
m := chain.msgs[0]
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
// test3a: 10 messages from a1 to a2, with gasPerf increasing in groups of 3; it should
// merge them in two chains, one with 9 messages and one with the last message
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3))
mset[uint64(i)] = m
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 2 {
t.Fatal("expected 1 chain")
}
if len(chains[0].msgs) != 9 {
t.Fatalf("expected 9 messages in the chain but got %d", len(chains[0].msgs))
}
if len(chains[1].msgs) != 1 {
t.Fatalf("expected 1 messages in the chain but got %d", len(chains[1].msgs))
}
nextNonce := 0
for _, chain := range chains {
for _, m := range chain.msgs {
if m.Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
}
nextNonce++
}
}
// test3b: 10 messages from a1 to a2, with gasPerf decreasing in groups of 3 with a bias for the
// earlier chains; it should make 4 chains, the first 3 with 3 messages and the last with
// a single message
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
bias := (12 - i) / 3
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias))
mset[uint64(i)] = m
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 4 {
t.Fatal("expected 4 chains")
}
for i, chain := range chains {
expectedLen := 3
if i > 2 {
expectedLen = 1
}
if len(chain.msgs) != expectedLen {
t.Fatalf("expected %d message in chain %d but got %d", expectedLen, i, len(chain.msgs))
}
}
nextNonce = 0
for _, chain := range chains {
for _, m := range chain.msgs {
if m.Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce)
}
nextNonce++
}
}
// test chain breaks
// test4: 10 messages with non-consecutive nonces; it should make a single chain with just
// the first message
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i*2), gasLimit, uint64(i+1))
mset[uint64(i)] = m
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 1 {
t.Fatal("expected a single chain")
}
if len(chains[0].msgs) != 1 {
t.Fatalf("expected 1 message in the chain but got %d", len(chains[0].msgs))
}
for i, m := range chains[0].msgs {
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
// test5: 10 messages with increasing gasLimit, except for the 6th message which has less than
// the epoch gasLimit; it should create a single chain with the first 5 messages
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
var m *types.SignedMessage
if i != 5 {
m = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
} else {
m = makeTestMessage(w1, a1, a2, uint64(i), 1, uint64(i+1))
}
mset[uint64(i)] = m
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 1 {
t.Fatal("expected a single chain")
}
if len(chains[0].msgs) != 5 {
t.Fatalf("expected 5 message in the chain but got %d", len(chains[0].msgs))
}
for i, m := range chains[0].msgs {
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
// test6: one more message than what can fit in a block according to gas limit, with increasing
// gasPerf; it should create a single chain with the max messages
maxMessages := int(build.BlockGasLimit / gasLimit)
nMessages := maxMessages + 1
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < nMessages; i++ {
mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 1 {
t.Fatal("expected a single chain")
}
if len(chains[0].msgs) != maxMessages {
t.Fatalf("expected %d message in the chain but got %d", maxMessages, len(chains[0].msgs))
}
for i, m := range chains[0].msgs {
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
// test5: insufficient balance for all messages
tma.setBalanceRaw(a1, types.NewInt(uint64(3*gasLimit+1)))
mset = make(map[uint64]*types.SignedMessage)
for i := 0; i < 10; i++ {
mset[uint64(i)] = makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
}
chains = mp.createMessageChains(a1, mset, ts)
if len(chains) != 1 {
t.Fatal("expected a single chain")
}
if len(chains[0].msgs) != 2 {
t.Fatalf("expected %d message in the chain but got %d", 2, len(chains[0].msgs))
}
for i, m := range chains[0].msgs {
if m.Message.Nonce != uint64(i) {
t.Fatalf("expected nonce %d but got %d", i, m.Message.Nonce)
}
}
}
func TestBasicMessageSelection(t *testing.T) {
mp, tma := makeTestMpool()
// the actors
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
a1, err := w1.GenerateKey(crypto.SigTypeBLS)
if err != nil {
t.Fatal(err)
}
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
a2, err := w2.GenerateKey(crypto.SigTypeBLS)
if err != nil {
t.Fatal(err)
}
block := mock.MkBlock(nil, 1, 1)
ts := mock.TipSet(block)
tma.applyBlock(t, block)
gasLimit := gasguess.Costs[gasguess.CostKey{builtin.StorageMarketActorCodeID, 2}]
tma.setBalance(a1, 1) // in FIL
tma.setBalance(a2, 1) // in FIL
// we create 10 messages from each actor to another, with the first actor paying higher
// gas prices than the second; we expect message selection to order his messages first
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+1))
mustAdd(t, mp, m)
}
for i := 0; i < 10; i++ {
m := makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1))
mustAdd(t, mp, m)
}
msgs, err := mp.SelectMessages(ts)
if err != nil {
t.Fatal(err)
}
if len(msgs) != 20 {
t.Fatalf("exptected 20 messages, got %d", len(msgs))
}
nextNonce := 0
for i := 0; i < 10; i++ {
if msgs[i].Message.From != a1 {
t.Fatalf("expected message from actor a1")
}
if msgs[i].Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d, got %d", msgs[i].Message.Nonce, nextNonce)
}
nextNonce++
}
nextNonce = 0
for i := 10; i < 20; i++ {
if msgs[i].Message.From != a2 {
t.Fatalf("expected message from actor a2")
}
if msgs[i].Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d, got %d", msgs[i].Message.Nonce, nextNonce)
}
nextNonce++
}
// now we make a block with all the messages and advance the chain
block2 := mock.MkBlock(ts, 2, 2)
tma.setBlockMessages(block2, msgs...)
tma.applyBlock(t, block2)
// we should have no pending messages in the mpool
pend, ts2 := mp.Pending()
if len(pend) != 0 {
t.Fatalf("expected no pending messages, but got %d", len(pend))
}
// create a block and advance the chain without applying to the mpool
msgs = nil
for i := 10; i < 20; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+1))
msgs = append(msgs, m)
m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1))
msgs = append(msgs, m)
}
block3 := mock.MkBlock(ts2, 3, 3)
tma.setBlockMessages(block3, msgs...)
ts3 := mock.TipSet(block3)
// now create another set of messages and add them to the mpool
for i := 20; i < 30; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+1))
mustAdd(t, mp, m)
m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1))
mustAdd(t, mp, m)
}
// select messages in the last tipset; this should include the missed messages as well as
// the last messages we added, with the first actor's messages first
// first we need to update the nonce on the tma
tma.setStateNonce(a1, 10)
tma.setStateNonce(a2, 10)
msgs, err = mp.SelectMessages(ts3)
if err != nil {
t.Fatal(err)
}
if len(msgs) != 40 {
t.Fatalf("expected 40 messages, got %d", len(msgs))
}
nextNonce = 10
for i := 0; i < 20; i++ {
if msgs[i].Message.From != a1 {
t.Fatalf("expected message from actor a1")
}
if msgs[i].Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d, got %d", msgs[i].Message.Nonce, nextNonce)
}
nextNonce++
}
nextNonce = 10
for i := 20; i < 40; i++ {
if msgs[i].Message.From != a2 {
t.Fatalf("expected message from actor a2")
}
if msgs[i].Message.Nonce != uint64(nextNonce) {
t.Fatalf("expected nonce %d, got %d", msgs[i].Message.Nonce, nextNonce)
}
nextNonce++
}
}

View File

@ -36,7 +36,6 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/miner"
)
var stateCmd = &cli.Command{
@ -857,12 +856,7 @@ var stateComputeStateCmd = &cli.Command{
var msgs []*types.Message
if cctx.Bool("apply-mpool-messages") {
pmsgs, err := api.MpoolPending(ctx, ts.Key())
if err != nil {
return err
}
pmsgs, err = miner.SelectMessages(ctx, api.StateGetActor, ts, pmsgs)
pmsgs, err := api.MpoolSelect(ctx, ts.Key())
if err != nil {
return err
}
@ -942,7 +936,7 @@ var compStateTemplate = `
font-size: 12px;
border-collapse: collapse;
}
tr {
tr {
border-top: 1px solid black;
border-bottom: 1px solid black;
}
@ -961,7 +955,7 @@ var compStateTemplate = `
/**
Checked State
**/
.ellipsis-toggle input:checked + .ellipsis {
display: none;
}
@ -1014,7 +1008,7 @@ var compStateMsg = `
<div><pre class="params">{{JsonParams ($code) (.Msg.Method) (.Msg.Params) | html}}</pre></div>
{{end}}
<div><span class="slow-{{IsSlow .Duration}}-{{IsVerySlow .Duration}}">Took {{.Duration}}</span>, <span class="exit{{IntExit .MsgRct.ExitCode}}">Exit: <b>{{.MsgRct.ExitCode}}</b></span>{{if gt (len .MsgRct.Return) 0}}, Return{{end}}</div>
{{if gt (len .MsgRct.Return) 0}}
<div><pre class="ret">{{JsonReturn ($code) (.Msg.Method) (.MsgRct.Return) | html}}</pre></div>
{{end}}

View File

@ -6,7 +6,6 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/miner"
"github.com/urfave/cli/v2"
)
@ -35,18 +34,13 @@ var minerSelectMsgsCmd = &cli.Command{
return err
}
msgs, err := api.MpoolPending(ctx, head.Key())
if err != nil {
return err
}
filtered, err := miner.SelectMessages(ctx, api.StateGetActor, head, msgs)
msgs, err := api.MpoolSelect(ctx, head.Key())
if err != nil {
return err
}
var totalGas int64
for i, f := range filtered {
for i, f := range msgs {
from := f.Message.From.String()
if len(from) > 8 {
from = "..." + from[len(from)-8:]
@ -61,8 +55,7 @@ var minerSelectMsgsCmd = &cli.Command{
totalGas += f.Message.GasLimit
}
fmt.Println("mempool input messages: ", len(msgs))
fmt.Println("filtered messages: ", len(filtered))
fmt.Println("selected messages: ", len(msgs))
fmt.Printf("total gas limit of selected messages: %d / %d (%0.2f%%)\n", totalGas, build.BlockGasLimit, 100*float64(totalGas)/float64(build.BlockGasLimit))
return nil
},

View File

@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
"golang.org/x/xerrors"
@ -34,20 +33,11 @@ func init() {
if err != nil {
return err
}
pending, err := api.MpoolPending(ctx, head.Key())
msgs, err := api.MpoolSelect(ctx, head.Key())
if err != nil {
return err
}
msgs, err := miner.SelectMessages(ctx, api.StateGetActor, head, pending)
if err != nil {
return err
}
if len(msgs) > build.BlockMessageLimit {
log.Error("SelectMessages returned too many messages: ", len(msgs))
msgs = msgs[:build.BlockMessageLimit]
}
addr, _ := address.NewIDAddress(1000)
var ticket *types.Ticket
{

View File

@ -356,15 +356,15 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
}
// get pending messages early,
pending, err := m.api.MpoolPending(context.TODO(), base.TipSet.Key())
msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key())
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
return nil, xerrors.Errorf("failed to select messages for block: %w", err)
}
tPending := build.Clock.Now()
// TODO: winning post proof
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, pending)
b, err := m.createBlock(base, m.address, ticket, winner, bvals, postProof, msgs)
if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err)
}
@ -421,17 +421,7 @@ func (m *Miner) computeTicket(ctx context.Context, brand *types.BeaconEntry, bas
}
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket,
eproof *types.ElectionProof, bvals []types.BeaconEntry, wpostProof []abi.PoStProof, pending []*types.SignedMessage) (*types.BlockMsg, error) {
msgs, err := SelectMessages(context.TODO(), m.api.StateGetActor, base.TipSet, pending)
if err != nil {
return nil, xerrors.Errorf("message filtering failed: %w", err)
}
if len(msgs) > build.BlockMessageLimit {
log.Error("SelectMessages returned too many messages: ", len(msgs))
msgs = msgs[:build.BlockMessageLimit]
}
eproof *types.ElectionProof, bvals []types.BeaconEntry, wpostProof []abi.PoStProof, msgs []*types.SignedMessage) (*types.BlockMsg, error) {
uts := base.TipSet.MinTimestamp() + build.BlockDelaySecs*(uint64(base.NullRounds)+1)
nheight := base.TipSet.Height() + base.NullRounds + 1

View File

@ -1,243 +0,0 @@
package miner
import (
"context"
"testing"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/stretchr/testify/assert"
)
func mustIDAddr(i uint64) address.Address {
a, err := address.NewIDAddress(i)
if err != nil {
panic(err)
}
return a
}
func TestSelectNotOverLimited(t *testing.T) {
ctx := context.TODO()
a1 := mustIDAddr(1)
a2 := mustIDAddr(2)
a3 := mustIDAddr(3)
actors := map[address.Address]*types.Actor{
a1: {
Code: builtin.AccountActorCodeID,
Nonce: 1,
Balance: types.FromFil(1200),
},
a2: {
Code: builtin.AccountActorCodeID,
Nonce: 1,
Balance: types.FromFil(1200),
},
a3: {
Code: builtin.StorageMinerActorCodeID,
Nonce: 0,
Balance: types.FromFil(1000),
},
}
af := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return actors[addr], nil
}
gasUsed := gasguess.Costs[gasguess.CostKey{builtin.StorageMinerActorCodeID, 4}]
var goodMsgs []types.Message
for i := int64(0); i < build.BlockGasLimit/gasUsed+10; i++ {
goodMsgs = append(goodMsgs, types.Message{
From: a1,
To: a3,
Method: 4,
Nonce: uint64(1 + i),
Value: types.FromFil(0),
GasLimit: gasUsed + 1000,
GasPrice: types.NewInt(1),
})
}
var badMsgs []types.Message
for i := int64(0); i < build.BlockGasLimit/gasUsed+10; i++ {
badMsgs = append(badMsgs, types.Message{
From: a2,
To: a3,
Method: 4,
Nonce: uint64(1 + i),
Value: types.FromFil(0),
GasLimit: 10 * gasUsed,
GasPrice: types.NewInt(9),
})
}
outmsgs, err := SelectMessages(ctx, af, &types.TipSet{}, wrapMsgs(append(goodMsgs, badMsgs...)))
if err != nil {
t.Fatal(err)
}
for _, v := range outmsgs {
if v.Message.From == a2 {
t.Errorf("included bad actor message: %v", v)
}
}
}
func TestMessageFiltering(t *testing.T) {
ctx := context.TODO()
a1 := mustIDAddr(1)
a2 := mustIDAddr(2)
actors := map[address.Address]*types.Actor{
a1: {
Nonce: 3,
Balance: types.FromFil(1200),
},
a2: {
Nonce: 1,
Balance: types.FromFil(1000),
},
}
af := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return actors[addr], nil
}
msgs := []types.Message{
{
From: a1,
To: a1,
Nonce: 3,
Value: types.FromFil(500),
GasLimit: 100_000_000,
GasPrice: types.NewInt(1),
},
{
From: a1,
To: a1,
Nonce: 4,
Value: types.FromFil(500),
GasLimit: 100_000_000,
GasPrice: types.NewInt(1),
},
{
From: a2,
To: a1,
Nonce: 1,
Value: types.FromFil(800),
GasLimit: 100_000_000,
GasPrice: types.NewInt(1),
},
{
From: a2,
To: a1,
Nonce: 0,
Value: types.FromFil(800),
GasLimit: 100_000_000,
GasPrice: types.NewInt(1),
},
{
From: a2,
To: a1,
Nonce: 2,
Value: types.FromFil(150),
GasLimit: 100,
GasPrice: types.NewInt(1),
},
}
outmsgs, err := SelectMessages(ctx, af, &types.TipSet{}, wrapMsgs(msgs))
if err != nil {
t.Fatal(err)
}
assert.Len(t, outmsgs, 3, "filtering didnt work as expected")
was, expected := outmsgs[0].Message, msgs[2]
if was.From != expected.From || was.Nonce != expected.Nonce {
t.Fatal("filtering bad")
}
}
func wrapMsgs(msgs []types.Message) []*types.SignedMessage {
var out []*types.SignedMessage
for _, m := range msgs {
out = append(out, &types.SignedMessage{Message: m})
}
return out
}
func TestMessageFilteringSenderHopping(t *testing.T) {
ctx := context.TODO()
a1 := mustIDAddr(1)
a2 := mustIDAddr(2)
actors := map[address.Address]*types.Actor{
a1: {
Nonce: 0,
Balance: types.FromFil(1200),
},
a2: {
Nonce: 0,
Balance: types.FromFil(1000),
},
}
af := func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
return actors[addr], nil
}
msgs := []types.Message{
{
From: a1,
To: a1,
Nonce: 0,
Value: types.FromFil(500),
GasLimit: 2_000_000_000,
GasPrice: types.NewInt(100),
},
{
From: a1,
To: a1,
Nonce: 1,
Value: types.FromFil(500),
GasLimit: 2_000_000_000,
GasPrice: types.NewInt(1),
},
{
From: a2,
To: a1,
Nonce: 0,
Value: types.FromFil(1),
GasLimit: 1_000_000_000,
GasPrice: types.NewInt(10),
},
{
From: a2,
To: a1,
Nonce: 1,
Value: types.FromFil(1),
GasLimit: 1_000_000_000,
GasPrice: types.NewInt(1),
},
{
From: a2,
To: a1,
Nonce: 2,
Value: types.FromFil(1),
GasLimit: 100_000_000,
GasPrice: types.NewInt(1),
},
}
outmsgs, err := SelectMessages(ctx, af, &types.TipSet{}, wrapMsgs(msgs))
if err != nil {
t.Fatal(err)
}
assert.Len(t, outmsgs, 5, "filtering didnt work as expected")
}

View File

@ -1,230 +0,0 @@
package miner
import (
"bytes"
"context"
big2 "math/big"
"sort"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
)
func SelectMessages(ctx context.Context, al gasguess.ActorLookup, ts *types.TipSet, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
al = (&cachedActorLookup{
tsk: ts.Key(),
cache: map[address.Address]actCacheEntry{},
fallback: al,
}).StateGetActor
type senderMeta struct {
lastReward abi.TokenAmount
lastGasLimit int64
gasReward []abi.TokenAmount
gasLimit []int64
msgs []*types.SignedMessage
}
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]big.Int)
outBySender := make(map[address.Address]*senderMeta)
tooLowFundMsgs := 0
tooHighNonceMsgs := 0
start := build.Clock.Now()
vmValid := time.Duration(0)
getbal := time.Duration(0)
guessGasDur := time.Duration(0)
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
})
for _, msg := range msgs {
vmstart := build.Clock.Now()
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(msg.ChainLength()) // TODO: really should be doing just msg.ChainLength() but the sync side of this code doesnt seem to have access to that
if err := msg.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
log.Warnf("invalid message in message pool: %s", err)
continue
}
vmValid += build.Clock.Since(vmstart)
// TODO: this should be in some more general 'validate message' call
if msg.Message.GasLimit > build.BlockGasLimit {
log.Warnf("message in mempool had too high of a gas limit (%d)", msg.Message.GasLimit)
continue
}
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}
from := msg.Message.From
getBalStart := build.Clock.Now()
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, ts.Key())
if err != nil {
log.Warnf("failed to check message sender balance, skipping message: %+v", err)
continue
}
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
getbal += build.Clock.Since(getBalStart)
if inclBalances[from].LessThan(msg.Message.RequiredFunds()) {
log.Warnf("message had too low funds: %s %d %s %s %s", from, msg.Message.Nonce, inclBalances[from], msg.Message.Value, msg.Message.RequiredFunds())
tooLowFundMsgs++
// todo: drop from mpool
continue
}
if msg.Message.Nonce > inclNonces[from] {
tooHighNonceMsgs++
continue
}
if msg.Message.Nonce < inclNonces[from] {
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from))
continue
}
inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
sm := outBySender[from]
if sm == nil {
sm = &senderMeta{
lastReward: big.Zero(),
}
}
sm.gasLimit = append(sm.gasLimit, sm.lastGasLimit+msg.Message.GasLimit)
sm.lastGasLimit = sm.gasLimit[len(sm.gasLimit)-1]
guessGasStart := build.Clock.Now()
guessedGas, err := gasguess.GuessGasUsed(ctx, ts.Key(), msg, al)
guessGasDur += build.Clock.Since(guessGasStart)
if err != nil {
log.Infow("failed to guess gas", "to", msg.Message.To, "method", msg.Message.Method, "err", err)
}
estimatedReward := big.Mul(types.NewInt(uint64(guessedGas)), msg.Message.GasPrice)
sm.gasReward = append(sm.gasReward, big.Add(sm.lastReward, estimatedReward))
sm.lastReward = sm.gasReward[len(sm.gasReward)-1]
sm.msgs = append(sm.msgs, msg)
outBySender[from] = sm
}
gasLimitLeft := int64(build.BlockGasLimit)
orderedSenders := make([]address.Address, 0, len(outBySender))
for k := range outBySender {
orderedSenders = append(orderedSenders, k)
}
sort.Slice(orderedSenders, func(i, j int) bool {
return bytes.Compare(orderedSenders[i].Bytes(), orderedSenders[j].Bytes()) == -1
})
out := make([]*types.SignedMessage, 0, build.BlockMessageLimit)
{
for {
var bestSender address.Address
var nBest int
var bestGasToReward float64
// TODO: This is O(n^2)-ish, could use something like container/heap to cache this math
for _, sender := range orderedSenders {
meta, ok := outBySender[sender]
if !ok {
continue
}
for n := range meta.msgs {
if meta.gasLimit[n] > gasLimitLeft {
break
}
if n+len(out) > build.BlockMessageLimit {
break
}
gasToReward, _ := new(big2.Float).SetInt(meta.gasReward[n].Int).Float64()
gasToReward /= float64(meta.gasLimit[n])
if gasToReward >= bestGasToReward {
bestSender = sender
nBest = n + 1
bestGasToReward = gasToReward
}
}
}
if nBest == 0 {
log.Warning("nBest is zero: ", gasLimitLeft)
break // block gas limit reached
}
{
out = append(out, outBySender[bestSender].msgs[:nBest]...)
seqGasLimit := outBySender[bestSender].gasLimit[nBest-1]
seqGasReward := outBySender[bestSender].gasReward[nBest-1]
gasLimitLeft -= seqGasLimit
outBySender[bestSender].msgs = outBySender[bestSender].msgs[nBest:]
outBySender[bestSender].gasLimit = outBySender[bestSender].gasLimit[nBest:]
outBySender[bestSender].gasReward = outBySender[bestSender].gasReward[nBest:]
for i := range outBySender[bestSender].gasLimit {
outBySender[bestSender].gasLimit[i] -= seqGasLimit
outBySender[bestSender].gasReward[i] = big.Sub(outBySender[bestSender].gasReward[i], seqGasReward)
}
if len(outBySender[bestSender].msgs) == 0 {
delete(outBySender, bestSender)
}
}
if len(out) >= build.BlockMessageLimit {
break
}
}
}
if tooLowFundMsgs > 0 {
log.Warnf("%d messages in mempool does not have enough funds", tooLowFundMsgs)
}
if tooHighNonceMsgs > 0 {
log.Warnf("%d messages in mempool had too high nonce", tooHighNonceMsgs)
}
sm := build.Clock.Now()
if sm.Sub(start) > time.Second {
log.Warnw("SelectMessages took a long time",
"duration", sm.Sub(start),
"vmvalidate", vmValid,
"getbalance", getbal,
"guessgas", guessGasDur,
"msgs", len(msgs))
}
return out, nil
}

View File

@ -25,6 +25,15 @@ type MpoolAPI struct {
Mpool *messagepool.MessagePool
}
func (a *MpoolAPI) MpoolSelect(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return a.Mpool.SelectMessages(ts)
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*types.SignedMessage, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {