bd10bdf99a
* build: Bump version to v1.17.3-dev * build: set version to v1.18.0-dev * chore: actors: Allow builtin-actors to return a map of methods (#9342) * Allow builtin-actors to return a map of methods * go mod * Fix tests * Fix tests, check carefully please * Delete lotus-pond (#9352) * feat: add StateNetworkVersion to mpool API * chore: refactor: rename NewestNetworkVersion * feat: actors: Integrate datacap actor into lotus (#9348) * Integrate datacap actor * Implement datacap actor in chain/builtin * feat: support typed errors over RPC * chore: deps: update to go-jsonrpc 0.1.8 * remove duplicate import * fix: itest: check for closed connection * chore: refactor: move retry test to API * address magik supernit * Add ability to only have single partition per msg for partitions with recovery sectors * doc gen * Address comments * Return beneficiary info from miner state Info() * Update builtin-actors to dev/20220922-v9 which includes FIP-0045 changes in progress * Integrate verifreg changes to lotus * Setup datacap actor * Update builtin-actors to dev/20220922-v9-1 * Update datacap actor to query datacap instead of verifreg * update gst * update markets * update actors with hamt fix * update gst * Update datacap to parse tokens * Update bundles * datacap and verifreg actors use ID addresses without protocol byte * update builtin-actors to rc1 * update go-fil-markets * Update bundles to rc2 * Integrate the v9 migration * Add api for getting allocation * Add upgrade epoch for butterfly * Tweak PreSeal struct to be infra-friendly * docsgen * More tweaking of PreSeal for genesis * review fixes * Use fake cid for test * add butterfly artifacts for oct 5 upgrade * check datacaps for v8 verifreg match v9 datacap actor * Remove print statements * Update to go-state-types master * Update to go-state-types v0.9.0-rc1 * review fixes * use go-fil-markets v1.24.0-v17 * Add accessors for allocations and claims maps * fix: missing permissions tag * butterfly * update butterfly artifacts * sealing pipeline: Prepare deal assigning logic for FIP-45 * sealing pipeline: Get allocationId with StateApi * use NoAllocationID instead of nil AllocationId * address review * Add datacap actor to registry.go * Add cli for listing allocations and removing expired allocations * Update to go-state-types master * deps: upgrade go-merkledag to 0.8.0 * shark params * Update cli/filplus.go Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * revert change to verifreg util * docsgen-cli * miss the stuff * Update FFI * Update go-state-types to v0.9.0 * Update builtin-actors to v9.0.0 * add calib upgrade epcoh * update the upgrade envvar * kill shark * Remove fvm splash banner from nv17 upgrade * check invariance for pending deals and allocations * check pending verified deal proposal migrated to allocation * Add check for unsealed CID in precommit sectors * Fix counting of allocations in nv17 migration test * make gen * pass state trees as pointers * Add assertion that migrations with & without cache are the same * compare allocation to verified deal proposal * Fix miner state precommit info * fix migration test tool * add changelog * Update to go-state-types v0.9.1 * Integrate builtin-actors v9.0.1 * chore: ver: bump version for rc3 (#9512) * Bump version to 1.18.0-rc3 * Update CHANGELOG.md * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Migration: Use autobatch bs * Fix autobatch Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> * Invoker: Use MethodMeta from go-state-types * Add a second premigration for nv17 * Add more shed tools for migration checking * address review * Lotus release v1.18.0-rc4 * fix: ci: fix app-image build on ci (#9527) * Remove old go version first * Add GO_VERSION file * Use GO_VERSION to set / verify go version * mv GO_VERSION GO_VERSION_MIN * Use GO_VERSION_MIN in Makefile check Co-authored-by: Ian Davis <jungziege@gmail.com> * Update to latest go-state-types for migration fixes * go mod tidy * fix: use api.ErrActorNotFound instead of types.ErrActorNotFound * fix: add fields to ForkUpgradeParams * docs: update actors_version_checklist.md * chore: fix lint * update to go state type v0.9.6 with market migration fix (#9545) * update go-state-types to v-0.9.7 * Add invariant checks to migration * fix invariant check: number of entries in datacap actor should include verifreg * Invariant checks: Only include not-activated deals * test: nv17 migration * Address review * add lotus-shed invariance method * Migration cli takes a stateroot cid and a height * make gen * Update to builtin-actors v9.0.2 * Failing test that shows that notaries can remove datacap from the verifreg actor * Test that should pass when the problem is solved * make gen * Review fixes * statemanager call function will return call information even if call errors * update go-state-types * update builtin-actors * bubble up errors properly from ApplyImplicitMessage * bump to rc5 * set new upgrade heights for calibnet * set new upgrade height for butterfly * tweak calibnet upgrade schedule * clarify changelog note about calibnet * butterfly * update calibnet artifacts * Allow setting local bundles for Debug FVM for av 9+ * fix: autobatch: remove potential deadlock when a block is missing Check the _underlying_ blockstore instead of recursing. Also, drop the lock before we do that. * fix imports * build: set shark mainnet epoch (#9640) * chore: build: Lotus release v1.18.0 (#9641) * Lotus release v1.18.0 * add changelog * address review * changelog improvement Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Łukasz Magiera <magik6k@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Geoff Stuart <geoff.vball@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: simlecode <69969590+simlecode@users.noreply.github.com> Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Ian Davis <jungziege@gmail.com> Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com> Co-authored-by: Steven Allen <steven@stebalien.com>
1016 lines
28 KiB
Go
1016 lines
28 KiB
Go
package messagepool
|
|
|
|
import (
|
|
"context"
|
|
"math/big"
|
|
"math/rand"
|
|
"sort"
|
|
"time"
|
|
|
|
cbg "github.com/whyrusleeping/cbor-gen"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
tbig "github.com/filecoin-project/go-state-types/big"
|
|
"github.com/filecoin-project/go-state-types/crypto"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
)
|
|
|
|
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
|
|
|
const MaxBlocks = 15
|
|
|
|
type msgChain struct {
|
|
msgs []*types.SignedMessage
|
|
gasReward *big.Int
|
|
gasLimit int64
|
|
gasPerf float64
|
|
effPerf float64
|
|
bp float64
|
|
parentOffset float64
|
|
valid bool
|
|
merged bool
|
|
next *msgChain
|
|
prev *msgChain
|
|
sigType crypto.SigType
|
|
}
|
|
|
|
func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
|
mp.curTsLk.Lock()
|
|
defer mp.curTsLk.Unlock()
|
|
|
|
mp.lk.Lock()
|
|
defer mp.lk.Unlock()
|
|
|
|
// if the ticket quality is high enough that the first block has higher probability
|
|
// than any other block, then we don't bother with optimal selection because the
|
|
// first block will always have higher effective performance
|
|
var sm *selectedMessages
|
|
var err error
|
|
if tq > 0.84 {
|
|
sm, err = mp.selectMessagesGreedy(ctx, mp.curTs, ts)
|
|
} else {
|
|
sm, err = mp.selectMessagesOptimal(ctx, mp.curTs, ts, tq)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if sm == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// one last sanity check
|
|
if len(sm.msgs) > build.BlockMessageLimit {
|
|
log.Errorf("message selection chose too many messages %d > %d", len(sm.msgs), build.BlockMessageLimit)
|
|
sm.msgs = sm.msgs[:build.BlockMessageLimit]
|
|
}
|
|
|
|
return sm.msgs, nil
|
|
}
|
|
|
|
type selectedMessages struct {
|
|
msgs []*types.SignedMessage
|
|
gasLimit int64
|
|
secpLimit int
|
|
blsLimit int
|
|
}
|
|
|
|
// returns false if chain can't be added due to block constraints
|
|
func (sm *selectedMessages) tryToAdd(mc *msgChain) bool {
|
|
l := len(mc.msgs)
|
|
|
|
if build.BlockMessageLimit < l+len(sm.msgs) || sm.gasLimit < mc.gasLimit {
|
|
return false
|
|
}
|
|
|
|
if mc.sigType == crypto.SigTypeBLS {
|
|
if sm.blsLimit < l {
|
|
return false
|
|
}
|
|
|
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
|
sm.blsLimit -= l
|
|
sm.gasLimit -= mc.gasLimit
|
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
|
if sm.secpLimit < l {
|
|
return false
|
|
}
|
|
|
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
|
sm.secpLimit -= l
|
|
sm.gasLimit -= mc.gasLimit
|
|
}
|
|
|
|
// don't add the weird sigType msg, but otherwise proceed
|
|
return true
|
|
}
|
|
|
|
// returns false if messages can't be added due to block constraints
|
|
// will trim / invalidate chain as appropriate
|
|
func (sm *selectedMessages) tryToAddWithDeps(mc *msgChain, mp *MessagePool, baseFee types.BigInt) bool {
|
|
// compute the dependencies that must be merged and the gas limit including deps
|
|
chainGasLimit := mc.gasLimit
|
|
chainMsgLimit := len(mc.msgs)
|
|
depGasLimit := int64(0)
|
|
depMsgLimit := 0
|
|
smMsgLimit := 0
|
|
|
|
if mc.sigType == crypto.SigTypeBLS {
|
|
smMsgLimit = sm.blsLimit
|
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
|
smMsgLimit = sm.secpLimit
|
|
} else {
|
|
return false
|
|
}
|
|
|
|
if smMsgLimit > build.BlockMessageLimit-len(sm.msgs) {
|
|
smMsgLimit = build.BlockMessageLimit - len(sm.msgs)
|
|
}
|
|
|
|
var chainDeps []*msgChain
|
|
for curChain := mc.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
|
chainDeps = append(chainDeps, curChain)
|
|
chainGasLimit += curChain.gasLimit
|
|
chainMsgLimit += len(curChain.msgs)
|
|
depGasLimit += curChain.gasLimit
|
|
depMsgLimit += len(curChain.msgs)
|
|
}
|
|
|
|
// the chain doesn't fit as-is, so trim / invalidate it and return false
|
|
if chainGasLimit > sm.gasLimit || chainMsgLimit > smMsgLimit {
|
|
|
|
// it doesn't all fit; now we have to take into account the dependent chains before
|
|
// making a decision about trimming or invalidating.
|
|
// if the dependencies exceed the block limits, then we must invalidate the chain
|
|
// as it can never be included.
|
|
// Otherwise we can just trim and continue
|
|
if depGasLimit > sm.gasLimit || depMsgLimit >= smMsgLimit {
|
|
mc.Invalidate()
|
|
} else {
|
|
// dependencies fit, just trim it
|
|
mc.Trim(sm.gasLimit-depGasLimit, smMsgLimit-depMsgLimit, mp, baseFee)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// the chain fits! include it together with all dependencies
|
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
|
curChain := chainDeps[i]
|
|
curChain.merged = true
|
|
sm.msgs = append(sm.msgs, curChain.msgs...)
|
|
}
|
|
|
|
mc.merged = true
|
|
|
|
sm.msgs = append(sm.msgs, mc.msgs...)
|
|
sm.gasLimit -= chainGasLimit
|
|
|
|
if mc.sigType == crypto.SigTypeBLS {
|
|
sm.blsLimit -= chainMsgLimit
|
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
|
sm.secpLimit -= chainMsgLimit
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (sm *selectedMessages) trimChain(mc *msgChain, mp *MessagePool, baseFee types.BigInt) {
|
|
msgLimit := build.BlockMessageLimit - len(sm.msgs)
|
|
if mc.sigType == crypto.SigTypeBLS {
|
|
if msgLimit > sm.blsLimit {
|
|
msgLimit = sm.blsLimit
|
|
}
|
|
} else if mc.sigType == crypto.SigTypeSecp256k1 {
|
|
if msgLimit > sm.secpLimit {
|
|
msgLimit = sm.secpLimit
|
|
}
|
|
}
|
|
|
|
if mc.gasLimit > sm.gasLimit || len(mc.msgs) > msgLimit {
|
|
mc.Trim(sm.gasLimit, msgLimit, mp, baseFee)
|
|
}
|
|
}
|
|
|
|
func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *types.TipSet, tq float64) (*selectedMessages, error) {
|
|
start := time.Now()
|
|
|
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("computing basefee: %w", err)
|
|
}
|
|
|
|
// 0. Load messages from the target tipset; if it is the same as the current tipset in
|
|
// the mpool, then this is just the pending messages
|
|
pending, err := mp.getPendingMessages(ctx, curTs, ts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(pending) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// defer only here so if we have no pending messages we don't spam
|
|
defer func() {
|
|
log.Infow("message selection done", "took", time.Since(start))
|
|
}()
|
|
|
|
// 0b. Select all priority messages that fit in the block
|
|
minGas := int64(gasguess.MinGas)
|
|
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
|
|
|
// have we filled the block?
|
|
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
|
return result, nil
|
|
}
|
|
|
|
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
|
|
startChains := time.Now()
|
|
var chains []*msgChain
|
|
for actor, mset := range pending {
|
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
|
chains = append(chains, next...)
|
|
}
|
|
if dt := time.Since(startChains); dt > time.Millisecond {
|
|
log.Infow("create message chains done", "took", dt)
|
|
}
|
|
|
|
// 2. Sort the chains
|
|
sort.Slice(chains, func(i, j int) bool {
|
|
return chains[i].Before(chains[j])
|
|
})
|
|
|
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
|
return result, nil
|
|
}
|
|
|
|
// 3. Partition chains into blocks (without trimming)
|
|
// we use the full blockGasLimit (as opposed to the residual gas limit from the
|
|
// priority message selection) as we have to account for what other block providers are doing
|
|
nextChain := 0
|
|
partitions := make([][]*msgChain, MaxBlocks)
|
|
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
|
gasLimit := build.BlockGasLimit
|
|
msgLimit := build.BlockMessageLimit
|
|
for nextChain < len(chains) {
|
|
chain := chains[nextChain]
|
|
nextChain++
|
|
partitions[i] = append(partitions[i], chain)
|
|
gasLimit -= chain.gasLimit
|
|
msgLimit -= len(chain.msgs)
|
|
if gasLimit < minGas || msgLimit <= 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// 4. Compute effective performance for each chain, based on the partition they fall into
|
|
// The effective performance is the gasPerf of the chain * block probability
|
|
blockProb := mp.blockProbabilities(tq)
|
|
effChains := 0
|
|
for i := 0; i < MaxBlocks; i++ {
|
|
for _, chain := range partitions[i] {
|
|
chain.SetEffectivePerf(blockProb[i])
|
|
}
|
|
effChains += len(partitions[i])
|
|
}
|
|
|
|
// nullify the effective performance of chains that don't fit in any partition
|
|
for _, chain := range chains[effChains:] {
|
|
chain.SetNullEffectivePerf()
|
|
}
|
|
|
|
// 5. Resort the chains based on effective performance
|
|
sort.Slice(chains, func(i, j int) bool {
|
|
return chains[i].BeforeEffective(chains[j])
|
|
})
|
|
|
|
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
|
// subject to the residual block limits
|
|
// When a chain is merged in, all its previous dependent chains *must* also be
|
|
// merged in or we'll have a broken block
|
|
startMerge := time.Now()
|
|
last := len(chains)
|
|
for i, chain := range chains {
|
|
// did we run out of performing chains?
|
|
if chain.gasPerf < 0 {
|
|
break
|
|
}
|
|
|
|
// has it already been merged?
|
|
if chain.merged {
|
|
continue
|
|
}
|
|
|
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
|
// adjust the effective performance for all subsequent chains
|
|
if next := chain.next; next != nil && next.effPerf > 0 {
|
|
next.effPerf += next.parentOffset
|
|
for next = next.next; next != nil && next.effPerf > 0; next = next.next {
|
|
next.setEffPerf()
|
|
}
|
|
}
|
|
|
|
// re-sort to account for already merged chains and effective performance adjustments
|
|
// the sort *must* be stable or we end up getting negative gasPerfs pushed up.
|
|
sort.SliceStable(chains[i+1:], func(i, j int) bool {
|
|
return chains[i].BeforeEffective(chains[j])
|
|
})
|
|
|
|
continue
|
|
}
|
|
|
|
// we can't fit this chain and its dependencies because of block limits -- we are
|
|
// at the edge
|
|
last = i
|
|
break
|
|
}
|
|
if dt := time.Since(startMerge); dt > time.Millisecond {
|
|
log.Infow("merge message chains done", "took", dt)
|
|
}
|
|
|
|
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
|
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
|
// Trimming invalidates subsequent dependent chains so that they can't be selected
|
|
// as their dependency cannot be (fully) included.
|
|
// We do this in a loop because the blocker might have been inordinately large and
|
|
// we might have to do it multiple times to satisfy tail packing
|
|
startTail := time.Now()
|
|
tailLoop:
|
|
for result.gasLimit >= minGas && last < len(chains) {
|
|
|
|
if !chains[last].valid {
|
|
last++
|
|
continue tailLoop
|
|
}
|
|
|
|
// trim if necessary
|
|
result.trimChain(chains[last], mp, baseFee)
|
|
|
|
// push down if it hasn't been invalidated
|
|
if chains[last].valid {
|
|
for i := last; i < len(chains)-1; i++ {
|
|
if chains[i].BeforeEffective(chains[i+1]) {
|
|
break
|
|
}
|
|
chains[i], chains[i+1] = chains[i+1], chains[i]
|
|
}
|
|
}
|
|
|
|
// select the next (valid and fitting) chain and its dependencies for inclusion
|
|
for _, chain := range chains[last:] {
|
|
// has the chain been invalidated?
|
|
if !chain.valid {
|
|
continue
|
|
}
|
|
|
|
// has it already been merged?
|
|
if chain.merged {
|
|
continue
|
|
}
|
|
|
|
// if gasPerf < 0 we have no more profitable chains
|
|
if chain.gasPerf < 0 {
|
|
break tailLoop
|
|
}
|
|
|
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
|
continue
|
|
}
|
|
|
|
continue tailLoop
|
|
}
|
|
|
|
// the merge loop ended after processing all the chains and we we probably have still
|
|
// gas to spare; end the loop.
|
|
break
|
|
}
|
|
if dt := time.Since(startTail); dt > time.Millisecond {
|
|
log.Infow("pack tail chains done", "took", dt)
|
|
}
|
|
|
|
// if we have room to spare, pick some random (non-negative) chains to fill the block
|
|
// we pick randomly so that we minimize the probability of duplication among all block producers
|
|
if result.gasLimit >= minGas && len(result.msgs) <= build.BlockMessageLimit {
|
|
preRandomLength := len(result.msgs)
|
|
|
|
startRandom := time.Now()
|
|
shuffleChains(chains)
|
|
|
|
for _, chain := range chains {
|
|
// have we filled the block
|
|
if result.gasLimit < minGas || len(result.msgs) >= build.BlockMessageLimit {
|
|
break
|
|
}
|
|
|
|
// has it been merged or invalidated?
|
|
if chain.merged || !chain.valid {
|
|
continue
|
|
}
|
|
|
|
// is it negative?
|
|
if chain.gasPerf < 0 {
|
|
continue
|
|
}
|
|
|
|
if result.tryToAddWithDeps(chain, mp, baseFee) {
|
|
continue
|
|
}
|
|
|
|
if chain.valid {
|
|
// chain got trimmed on the previous call to tryToAddWithDeps, can now be included
|
|
result.tryToAddWithDeps(chain, mp, baseFee)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if dt := time.Since(startRandom); dt > time.Millisecond {
|
|
log.Infow("pack random tail chains done", "took", dt)
|
|
}
|
|
|
|
if len(result.msgs) != preRandomLength {
|
|
log.Warnf("optimal selection failed to pack a block; picked %d messages with random selection",
|
|
len(result.msgs)-preRandomLength)
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *types.TipSet) (*selectedMessages, error) {
|
|
start := time.Now()
|
|
|
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("computing basefee: %w", err)
|
|
}
|
|
|
|
// 0. Load messages for the target tipset; if it is the same as the current tipset in the mpool
|
|
// then this is just the pending messages
|
|
pending, err := mp.getPendingMessages(ctx, curTs, ts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(pending) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// defer only here so if we have no pending messages we don't spam
|
|
defer func() {
|
|
log.Infow("message selection done", "took", time.Since(start))
|
|
}()
|
|
|
|
// 0b. Select all priority messages that fit in the block
|
|
minGas := int64(gasguess.MinGas)
|
|
result := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
|
|
|
// have we filled the block?
|
|
if result.gasLimit < minGas || len(result.msgs) > build.BlockMessageLimit {
|
|
return result, nil
|
|
}
|
|
|
|
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
|
|
startChains := time.Now()
|
|
var chains []*msgChain
|
|
for actor, mset := range pending {
|
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
|
chains = append(chains, next...)
|
|
}
|
|
if dt := time.Since(startChains); dt > time.Millisecond {
|
|
log.Infow("create message chains done", "took", dt)
|
|
}
|
|
|
|
// 2. Sort the chains
|
|
sort.Slice(chains, func(i, j int) bool {
|
|
return chains[i].Before(chains[j])
|
|
})
|
|
|
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
|
return result, nil
|
|
}
|
|
|
|
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
|
|
// the block gas and message limits.
|
|
startMerge := time.Now()
|
|
last := len(chains)
|
|
for i, chain := range chains {
|
|
// did we run out of performing chains?
|
|
if chain.gasPerf < 0 {
|
|
break
|
|
}
|
|
|
|
// does it fit in the block?
|
|
if result.tryToAdd(chain) {
|
|
// there was room, we added the chain, keep going
|
|
continue
|
|
}
|
|
|
|
// we can't fit this chain because of block limits -- we are at the edge
|
|
last = i
|
|
break
|
|
}
|
|
if dt := time.Since(startMerge); dt > time.Millisecond {
|
|
log.Infow("merge message chains done", "took", dt)
|
|
}
|
|
|
|
// 4. We have reached the edge of what we can fit wholesale; if we still have available gasLimit
|
|
// to pack some more chains, then trim the last chain and push it down.
|
|
// Trimming invalidates subsequent dependent chains so that they can't be selected as their
|
|
// dependency cannot be (fully) included.
|
|
// We do this in a loop because the blocker might have been inordinately large and we might
|
|
// have to do it multiple times to satisfy tail packing.
|
|
startTail := time.Now()
|
|
tailLoop:
|
|
for result.gasLimit >= minGas && last < len(chains) {
|
|
// trim
|
|
result.trimChain(chains[last], mp, baseFee)
|
|
|
|
// push down if it hasn't been invalidated
|
|
if chains[last].valid {
|
|
for i := last; i < len(chains)-1; i++ {
|
|
if chains[i].Before(chains[i+1]) {
|
|
break
|
|
}
|
|
chains[i], chains[i+1] = chains[i+1], chains[i]
|
|
}
|
|
}
|
|
|
|
// select the next (valid and fitting) chain for inclusion
|
|
for i, chain := range chains[last:] {
|
|
// has the chain been invalidated?
|
|
if !chain.valid {
|
|
continue
|
|
}
|
|
|
|
// if gasPerf < 0 we have no more profitable chains
|
|
if chain.gasPerf < 0 {
|
|
break tailLoop
|
|
}
|
|
|
|
// does it fit in the bock?
|
|
if result.tryToAdd(chain) {
|
|
// there was room, we added the chain, keep going
|
|
continue
|
|
}
|
|
|
|
// this chain needs to be trimmed
|
|
last += i
|
|
continue tailLoop
|
|
}
|
|
|
|
// the merge loop ended after processing all the chains and we probably still have
|
|
// gas to spare; end the loop
|
|
break
|
|
}
|
|
if dt := time.Since(startTail); dt > time.Millisecond {
|
|
log.Infow("pack tail chains done", "took", dt)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (mp *MessagePool) selectPriorityMessages(ctx context.Context, pending map[address.Address]map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *selectedMessages {
|
|
start := time.Now()
|
|
defer func() {
|
|
if dt := time.Since(start); dt > time.Millisecond {
|
|
log.Infow("select priority messages done", "took", dt)
|
|
}
|
|
}()
|
|
mpCfg := mp.getConfig()
|
|
result := &selectedMessages{
|
|
msgs: make([]*types.SignedMessage, 0, mpCfg.SizeLimitLow),
|
|
gasLimit: build.BlockGasLimit,
|
|
blsLimit: cbg.MaxLength,
|
|
secpLimit: cbg.MaxLength,
|
|
}
|
|
minGas := int64(gasguess.MinGas)
|
|
|
|
// 1. Get priority actor chains
|
|
var chains []*msgChain
|
|
priority := mpCfg.PriorityAddrs
|
|
for _, actor := range priority {
|
|
pk, err := mp.resolveToKey(ctx, actor)
|
|
if err != nil {
|
|
log.Debugf("mpooladdlocal failed to resolve sender: %s", err)
|
|
return result
|
|
}
|
|
|
|
mset, ok := pending[pk]
|
|
if ok {
|
|
// remove actor from pending set as we are already processed these messages
|
|
delete(pending, pk)
|
|
// create chains for the priority actor
|
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
|
chains = append(chains, next...)
|
|
}
|
|
}
|
|
if len(chains) == 0 {
|
|
return result
|
|
}
|
|
|
|
// 2. Sort the chains
|
|
sort.Slice(chains, func(i, j int) bool {
|
|
return chains[i].Before(chains[j])
|
|
})
|
|
|
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
|
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
|
|
return result
|
|
}
|
|
|
|
// 3. Merge chains until the block limit, as long as they have non-negative gas performance
|
|
last := len(chains)
|
|
for i, chain := range chains {
|
|
if chain.gasPerf < 0 {
|
|
break
|
|
}
|
|
|
|
if result.tryToAdd(chain) {
|
|
// there was room, we added the chain, keep going
|
|
continue
|
|
}
|
|
|
|
// we can't fit this chain because of block gasLimit -- we are at the edge
|
|
last = i
|
|
break
|
|
}
|
|
|
|
tailLoop:
|
|
for result.gasLimit >= minGas && last < len(chains) {
|
|
// trim, discarding negative performing messages
|
|
|
|
result.trimChain(chains[last], mp, baseFee)
|
|
|
|
// push down if it hasn't been invalidated
|
|
if chains[last].valid {
|
|
for i := last; i < len(chains)-1; i++ {
|
|
if chains[i].Before(chains[i+1]) {
|
|
break
|
|
}
|
|
chains[i], chains[i+1] = chains[i+1], chains[i]
|
|
}
|
|
}
|
|
|
|
// select the next (valid and fitting) chain for inclusion
|
|
for i, chain := range chains[last:] {
|
|
// has the chain been invalidated
|
|
if !chain.valid {
|
|
continue
|
|
}
|
|
|
|
// if gasPerf < 0 we have no more profitable chains
|
|
if chain.gasPerf < 0 {
|
|
break tailLoop
|
|
}
|
|
|
|
// does it fit in the bock?
|
|
if result.tryToAdd(chain) {
|
|
// there was room, we added the chain, keep going
|
|
continue
|
|
}
|
|
|
|
// this chain needs to be trimmed
|
|
last += i
|
|
continue tailLoop
|
|
}
|
|
|
|
// the merge loop ended after processing all the chains and we probably still have gas to spare;
|
|
// end the loop
|
|
break
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (mp *MessagePool) getPendingMessages(ctx context.Context, curTs, ts *types.TipSet) (map[address.Address]map[uint64]*types.SignedMessage, error) {
|
|
start := time.Now()
|
|
|
|
result := make(map[address.Address]map[uint64]*types.SignedMessage)
|
|
defer func() {
|
|
if dt := time.Since(start); dt > time.Millisecond {
|
|
log.Infow("get pending messages done", "took", dt)
|
|
}
|
|
}()
|
|
|
|
// are we in sync?
|
|
inSync := false
|
|
if curTs.Height() == ts.Height() && curTs.Equals(ts) {
|
|
inSync = true
|
|
}
|
|
|
|
mp.forEachPending(func(a address.Address, mset *msgSet) {
|
|
if inSync {
|
|
// no need to copy the map
|
|
result[a] = mset.msgs
|
|
} else {
|
|
// we need to copy the map to avoid clobbering it as we load more messages
|
|
msetCopy := make(map[uint64]*types.SignedMessage, len(mset.msgs))
|
|
for nonce, m := range mset.msgs {
|
|
msetCopy[nonce] = m
|
|
}
|
|
result[a] = msetCopy
|
|
|
|
}
|
|
})
|
|
|
|
// we are in sync, that's the happy path
|
|
if inSync {
|
|
return result, nil
|
|
}
|
|
|
|
if err := mp.runHeadChange(ctx, curTs, ts, result); err != nil {
|
|
return nil, xerrors.Errorf("failed to process difference between mpool head and given head: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (*MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt) *big.Int {
|
|
maxPremium := types.BigSub(msg.Message.GasFeeCap, baseFee)
|
|
|
|
if types.BigCmp(maxPremium, msg.Message.GasPremium) > 0 {
|
|
maxPremium = msg.Message.GasPremium
|
|
}
|
|
|
|
gasReward := tbig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit)))
|
|
if gasReward.Sign() == -1 {
|
|
// penalty multiplier
|
|
gasReward = tbig.Mul(gasReward, types.NewInt(3))
|
|
}
|
|
return gasReward.Int
|
|
}
|
|
|
|
func (*MessagePool) getGasPerf(gasReward *big.Int, gasLimit int64) float64 {
|
|
// gasPerf = gasReward * build.BlockGasLimit / gasLimit
|
|
a := new(big.Rat).SetInt(new(big.Int).Mul(gasReward, bigBlockGasLimit))
|
|
b := big.NewRat(1, gasLimit)
|
|
c := new(big.Rat).Mul(a, b)
|
|
r, _ := c.Float64()
|
|
return r
|
|
}
|
|
|
|
func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint64]*types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) []*msgChain {
|
|
// collect all messages
|
|
msgs := make([]*types.SignedMessage, 0, len(mset))
|
|
for _, m := range mset {
|
|
msgs = append(msgs, m)
|
|
}
|
|
|
|
// sort by nonce
|
|
sort.Slice(msgs, func(i, j int) bool {
|
|
return msgs[i].Message.Nonce < msgs[j].Message.Nonce
|
|
})
|
|
|
|
// sanity checks:
|
|
// - there can be no gaps in nonces, starting from the current actor nonce
|
|
// if there is a gap, drop messages after the gap, we can't include them
|
|
// - all messages must have minimum gas and the total gas for the candidate messages
|
|
// cannot exceed the block limit; drop all messages that exceed the limit
|
|
// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
|
|
// the balance
|
|
a, err := mp.api.GetActorAfter(actor, ts)
|
|
if err != nil {
|
|
log.Errorf("failed to load actor state, not building chain for %s: %v", actor, err)
|
|
return nil
|
|
}
|
|
|
|
curNonce := a.Nonce
|
|
balance := a.Balance.Int
|
|
gasLimit := int64(0)
|
|
skip := 0
|
|
i := 0
|
|
rewards := make([]*big.Int, 0, len(msgs))
|
|
for i = 0; i < len(msgs); i++ {
|
|
m := msgs[i]
|
|
|
|
if m.Message.Nonce < curNonce {
|
|
log.Warnf("encountered message from actor %s with nonce (%d) less than the current nonce (%d)",
|
|
actor, m.Message.Nonce, curNonce)
|
|
skip++
|
|
continue
|
|
}
|
|
|
|
if m.Message.Nonce != curNonce {
|
|
break
|
|
}
|
|
curNonce++
|
|
|
|
minGas := vm.PricelistByEpoch(ts.Height()).OnChainMessage(m.ChainLength()).Total()
|
|
if m.Message.GasLimit < minGas {
|
|
break
|
|
}
|
|
|
|
gasLimit += m.Message.GasLimit
|
|
if gasLimit > build.BlockGasLimit {
|
|
break
|
|
}
|
|
|
|
required := m.Message.RequiredFunds().Int
|
|
if balance.Cmp(required) < 0 {
|
|
break
|
|
}
|
|
|
|
balance = new(big.Int).Sub(balance, required)
|
|
|
|
value := m.Message.Value.Int
|
|
balance = new(big.Int).Sub(balance, value)
|
|
|
|
gasReward := mp.getGasReward(m, baseFee)
|
|
rewards = append(rewards, gasReward)
|
|
}
|
|
|
|
// check we have a sane set of messages to construct the chains
|
|
if i > skip {
|
|
msgs = msgs[skip:i]
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// if we have more messages from this sender than can fit in a block, drop the extra ones
|
|
if len(msgs) > build.BlockMessageLimit {
|
|
msgs = msgs[:build.BlockMessageLimit]
|
|
}
|
|
|
|
// ok, now we can construct the chains using the messages we have
|
|
// invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged
|
|
// and increase the gasPerf of the first chain
|
|
// We do this in two passes:
|
|
// - in the first pass we create chains that aggregate messages with non-decreasing gasPerf
|
|
// - in the second pass we merge chains to maintain the invariant.
|
|
var chains []*msgChain
|
|
var curChain *msgChain
|
|
|
|
newChain := func(m *types.SignedMessage, i int) *msgChain {
|
|
chain := new(msgChain)
|
|
chain.msgs = []*types.SignedMessage{m}
|
|
chain.gasReward = rewards[i]
|
|
chain.gasLimit = m.Message.GasLimit
|
|
chain.gasPerf = mp.getGasPerf(chain.gasReward, chain.gasLimit)
|
|
chain.valid = true
|
|
chain.sigType = m.Signature.Type
|
|
return chain
|
|
}
|
|
|
|
// create the individual chains
|
|
for i, m := range msgs {
|
|
if curChain == nil {
|
|
curChain = newChain(m, i)
|
|
continue
|
|
}
|
|
|
|
gasReward := new(big.Int).Add(curChain.gasReward, rewards[i])
|
|
gasLimit := curChain.gasLimit + m.Message.GasLimit
|
|
gasPerf := mp.getGasPerf(gasReward, gasLimit)
|
|
|
|
// try to add the message to the current chain -- if it decreases the gasPerf, or then make a
|
|
// new chain
|
|
if gasPerf < curChain.gasPerf {
|
|
chains = append(chains, curChain)
|
|
curChain = newChain(m, i)
|
|
} else {
|
|
curChain.msgs = append(curChain.msgs, m)
|
|
curChain.gasReward = gasReward
|
|
curChain.gasLimit = gasLimit
|
|
curChain.gasPerf = gasPerf
|
|
}
|
|
}
|
|
chains = append(chains, curChain)
|
|
|
|
// merge chains to maintain the invariant
|
|
for {
|
|
merged := 0
|
|
|
|
for i := len(chains) - 1; i > 0; i-- {
|
|
if chains[i].gasPerf >= chains[i-1].gasPerf {
|
|
chains[i-1].msgs = append(chains[i-1].msgs, chains[i].msgs...)
|
|
chains[i-1].gasReward = new(big.Int).Add(chains[i-1].gasReward, chains[i].gasReward)
|
|
chains[i-1].gasLimit += chains[i].gasLimit
|
|
chains[i-1].gasPerf = mp.getGasPerf(chains[i-1].gasReward, chains[i-1].gasLimit)
|
|
chains[i].valid = false
|
|
merged++
|
|
}
|
|
}
|
|
|
|
if merged == 0 {
|
|
break
|
|
}
|
|
|
|
// drop invalidated chains
|
|
newChains := make([]*msgChain, 0, len(chains)-merged)
|
|
for _, c := range chains {
|
|
if c.valid {
|
|
newChains = append(newChains, c)
|
|
}
|
|
}
|
|
chains = newChains
|
|
}
|
|
|
|
// link dependent chains
|
|
for i := 0; i < len(chains)-1; i++ {
|
|
chains[i].next = chains[i+1]
|
|
}
|
|
|
|
for i := len(chains) - 1; i > 0; i-- {
|
|
chains[i].prev = chains[i-1]
|
|
}
|
|
|
|
return chains
|
|
}
|
|
|
|
func (mc *msgChain) Before(other *msgChain) bool {
|
|
return mc.gasPerf > other.gasPerf ||
|
|
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
|
}
|
|
|
|
func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee types.BigInt) {
|
|
i := len(mc.msgs) - 1
|
|
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0 || i >= msgLimit) {
|
|
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
|
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
|
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
|
if mc.gasLimit > 0 {
|
|
mc.gasPerf = mp.getGasPerf(mc.gasReward, mc.gasLimit)
|
|
if mc.bp != 0 {
|
|
mc.setEffPerf()
|
|
}
|
|
} else {
|
|
mc.gasPerf = 0
|
|
mc.effPerf = 0
|
|
}
|
|
i--
|
|
}
|
|
|
|
if i < 0 {
|
|
mc.msgs = nil
|
|
mc.valid = false
|
|
} else {
|
|
mc.msgs = mc.msgs[:i+1]
|
|
}
|
|
|
|
// TODO: if the trim above is a no-op, this (may) needlessly invalidates the next chain
|
|
if mc.next != nil {
|
|
mc.next.Invalidate()
|
|
mc.next = nil
|
|
}
|
|
}
|
|
|
|
func (mc *msgChain) Invalidate() {
|
|
mc.valid = false
|
|
mc.msgs = nil
|
|
if mc.next != nil {
|
|
mc.next.Invalidate()
|
|
mc.next = nil
|
|
}
|
|
}
|
|
|
|
func (mc *msgChain) SetEffectivePerf(bp float64) {
|
|
mc.bp = bp
|
|
mc.setEffPerf()
|
|
}
|
|
|
|
func (mc *msgChain) setEffPerf() {
|
|
effPerf := mc.gasPerf * mc.bp
|
|
if effPerf > 0 && mc.prev != nil {
|
|
effPerfWithParent := (effPerf*float64(mc.gasLimit) + mc.prev.effPerf*float64(mc.prev.gasLimit)) / float64(mc.gasLimit+mc.prev.gasLimit)
|
|
mc.parentOffset = effPerf - effPerfWithParent
|
|
effPerf = effPerfWithParent
|
|
}
|
|
mc.effPerf = effPerf
|
|
|
|
}
|
|
|
|
func (mc *msgChain) SetNullEffectivePerf() {
|
|
if mc.gasPerf < 0 {
|
|
mc.effPerf = mc.gasPerf
|
|
} else {
|
|
mc.effPerf = 0
|
|
}
|
|
}
|
|
|
|
func (mc *msgChain) BeforeEffective(other *msgChain) bool {
|
|
// move merged chains to the front so we can discard them earlier
|
|
return (mc.merged && !other.merged) ||
|
|
(mc.gasPerf >= 0 && other.gasPerf < 0) ||
|
|
mc.effPerf > other.effPerf ||
|
|
(mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) ||
|
|
(mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
|
}
|
|
|
|
func shuffleChains(lst []*msgChain) {
|
|
for i := range lst {
|
|
j := rand.Intn(i + 1)
|
|
lst[i], lst[j] = lst[j], lst[i]
|
|
}
|
|
}
|