enhance SelectMessages method to take a target message

This commit is contained in:
vyzo 2020-08-05 23:16:09 +03:00
parent 5f599dcaf5
commit a08d780bef
2 changed files with 132 additions and 11 deletions

View File

@ -30,11 +30,13 @@ func (mp *MessagePool) pruneMessages(ctx context.Context, ts *types.TipSet) erro
log.Infof("message pruning took %s", time.Since(start))
}()
pending, _ := mp.getPendingMessages(ts, ts)
// Collect all messages to track which ones to remove and create chains for block inclusion
pruneMsgs := make(map[cid.Cid]*types.SignedMessage, mp.currentSize)
var chains []*msgChain
for actor, mset := range mp.pending {
for _, m := range mset.msgs {
for actor, mset := range pending {
for _, m := range mset {
pruneMsgs[m.Message.Cid()] = m
}
actorChains := mp.createMessageChains(actor, mset, ts)

View File

@ -2,6 +2,7 @@ package messagepool
import (
"context"
"golang.org/x/xerrors"
"math/big"
"sort"
"time"
@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
abig "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/ipfs/go-cid"
)
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
@ -26,26 +28,33 @@ type msgChain struct {
next *msgChain
}
func (mp *MessagePool) SelectMessages() []*types.SignedMessage {
func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) {
mp.curTsLk.Lock()
ts := mp.curTs
curTs := mp.curTs
mp.curTsLk.Unlock()
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.selectMessages(ts)
return mp.selectMessages(curTs, ts)
}
func (mp *MessagePool) selectMessages(ts *types.TipSet) []*types.SignedMessage {
func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
start := time.Now()
defer func() {
log.Infof("message selection took %s", time.Since(start))
}()
// 0. Load messages for the target tipset; if it is the same as the current tipset in the mpool
// then this is just the pending messages
pending, err := mp.getPendingMessages(curTs, ts)
if err != nil {
return nil, err
}
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
var chains []*msgChain
for actor, mset := range mp.pending {
for actor, mset := range pending {
next := mp.createMessageChains(actor, mset, ts)
chains = append(chains, next...)
}
@ -117,7 +126,110 @@ tailLoop:
last = len(chains)
}
return result
return result, nil
}
func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
result := make(map[address.Address]map[uint64]*types.SignedMessage)
haveCids := make(map[cid.Cid]struct{})
// are we in sync?
inSync := false
if curTs.Height() == ts.Height() && curTs.Equals(ts) {
inSync = true
}
// first add our current pending messages
for a, mset := range mp.pending {
if inSync {
// no need to copy the map
result[a] = mset.msgs
} else {
// we need to copy the map to avoid clobbering it as we load more messages
msetCopy := make(map[uint64]*types.SignedMessage, len(mset.msgs))
for nonce, m := range mset.msgs {
msetCopy[nonce] = m
}
result[a] = msetCopy
// mark the messages as seen
for _, m := range mset.msgs {
haveCids[m.Cid()] = struct{}{}
}
}
}
// we are in sync, that's the happy path
if inSync {
return result, nil
}
// nope, we need to sync the tipsets
for {
if curTs.Height() == ts.Height() {
if curTs.Equals(ts) {
return result, nil
}
// different blocks in tipsets -- we mark them as seen so that they are not included in
// in the message set we return, but *neither me (vyzo) nor why understand why*
// this code is also probably completely untested in production, so I am adding a big fat
// warning to revisit this case and sanity check this decision.
log.Warnf("mpool tipset has same height as target tipset but it's not equal; beware of dragons!")
have, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err)
}
for _, m := range msgs {
if _, have := haveCids[m.Cid()]; have {
continue
}
haveCids[m.Cid()] = struct{}{}
mset, ok := result[m.Message.From]
if !ok {
mset = make(map[uint64]*types.SignedMessage)
result[m.Message.From] = mset
}
other, dupNonce := mset[m.Message.Nonce]
if dupNonce {
// duplicate nonce, selfishly keep the message with the highest GasPrice
// if the gas prices are the same, keep the one with the highest GasLimit
switch m.Message.GasPrice.Int.Cmp(other.Message.GasPrice.Int) {
case 0:
if m.Message.GasLimit > other.Message.GasLimit {
mset[m.Message.Nonce] = m
}
case 1:
mset[m.Message.Nonce] = m
}
} else {
mset[m.Message.Nonce] = m
}
}
if curTs.Height() >= ts.Height() {
return result, nil
}
ts, err = mp.api.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("error loading parent tipset: %w", err)
}
}
}
func (mp *MessagePool) getGasReward(msg *types.SignedMessage, ts *types.TipSet) *big.Int {
@ -146,10 +258,10 @@ func (mp *MessagePool) getGasPerf(gasReward *big.Int, gasLimit int64) float64 {
return r
}
func (mp *MessagePool) createMessageChains(actor address.Address, mset *msgSet, ts *types.TipSet) []*msgChain {
func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint64]*types.SignedMessage, ts *types.TipSet) []*msgChain {
// collect all messages
msgs := make([]*types.SignedMessage, 0, len(mset.msgs))
for _, m := range mset.msgs {
msgs := make([]*types.SignedMessage, 0, len(mset))
for _, m := range mset {
msgs = append(msgs, m)
}
@ -173,6 +285,13 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset *msgSet,
rewards := make([]*big.Int, 0, len(msgs))
for i = 0; i < len(msgs); i++ {
m := msgs[i]
if m.Message.Nonce < curNonce {
log.Warnf("encountered message from actor %s with nonce (%d) less than the current nonce (%d)",
actor, m.Message.Nonce, curNonce)
continue
}
if m.Message.Nonce != curNonce {
break
}