nearly optimal message selection for a given ticket quality
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
657b390193
commit
ca803d99fe
@ -3,6 +3,7 @@ package messagepool
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -19,26 +20,283 @@ import (
|
|||||||
|
|
||||||
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)
|
||||||
|
|
||||||
|
const MaxBlocks = 15
|
||||||
|
|
||||||
type msgChain struct {
|
type msgChain struct {
|
||||||
msgs []*types.SignedMessage
|
msgs []*types.SignedMessage
|
||||||
gasReward *big.Int
|
gasReward *big.Int
|
||||||
gasLimit int64
|
gasLimit int64
|
||||||
gasPerf float64
|
gasPerf float64
|
||||||
|
effPerf float64
|
||||||
valid bool
|
valid bool
|
||||||
|
merged bool
|
||||||
next *msgChain
|
next *msgChain
|
||||||
|
prev *msgChain
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) SelectMessages(ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) SelectMessages(ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
defer mp.curTsLk.Unlock()
|
defer mp.curTsLk.Unlock()
|
||||||
|
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
return mp.selectMessages(mp.curTs, ts)
|
// if the ticket quality is high enough that the first block has higher probability
|
||||||
|
// than any other block, then we don't bother with optimal selection because the
|
||||||
|
// first block will always have higher effective performance
|
||||||
|
if tq > 0.84 {
|
||||||
|
return mp.selectMessagesGreedy(mp.curTs, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return mp.selectMessagesOptimal(mp.curTs, ts, tq)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("computing basefee: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 0. Load messages from the targeti tipset; if it is the same as the current tipset in
|
||||||
|
// the mpoll, then this is just the pending messages
|
||||||
|
pending, err := mp.getPendingMessages(curTs, ts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pending) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// defer only here so if we have no pending messages we don't spam
|
||||||
|
defer func() {
|
||||||
|
log.Infow("message selection done", "took", time.Since(start))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 0b. Select all priority messages that fit in the block
|
||||||
|
minGas := int64(gasguess.MinGas)
|
||||||
|
result, gasLimit := mp.selectPriorityMessages(pending, baseFee, ts)
|
||||||
|
|
||||||
|
// have we filled the block?
|
||||||
|
if gasLimit < minGas {
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. Create a list of dependent message chains with maximal gas reward per limit consumed
|
||||||
|
startChains := time.Now()
|
||||||
|
var chains []*msgChain
|
||||||
|
for actor, mset := range pending {
|
||||||
|
next := mp.createMessageChains(actor, mset, baseFee, ts)
|
||||||
|
chains = append(chains, next...)
|
||||||
|
}
|
||||||
|
log.Infow("create message chains done", "took", time.Since(startChains))
|
||||||
|
|
||||||
|
// 2. Sort the chains
|
||||||
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
|
return chains[i].Before(chains[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(chains) != 0 && chains[0].gasPerf < 0 {
|
||||||
|
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Parition chains into blocks (without trimming)
|
||||||
|
// we use the residual gas limit from the priority message selection as those message
|
||||||
|
// will be unconditionally included
|
||||||
|
residualGasLimit := gasLimit
|
||||||
|
nextChain := 0
|
||||||
|
partitions := make([][]*msgChain, MaxBlocks)
|
||||||
|
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
||||||
|
gasLimit := residualGasLimit
|
||||||
|
for nextChain < len(chains) {
|
||||||
|
chain := chains[nextChain]
|
||||||
|
partitions[i] = append(partitions[i], chain)
|
||||||
|
nextChain++
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
if gasLimit < minGas {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Compute effective performance for each chain, based on the partition they fall into
|
||||||
|
// The effective performance is the gasPerf of the chain * block probability
|
||||||
|
// Note that we don't have to do anything special about residues that didn't fit in any
|
||||||
|
// partition because these will already have an effective perf of 0 and will be pushed
|
||||||
|
// to the end by virtue of smaller gasPerf.
|
||||||
|
blockProb := mp.blockProbabilities(tq)
|
||||||
|
for i := 0; i < MaxBlocks; i++ {
|
||||||
|
for _, chain := range partitions[i] {
|
||||||
|
chain.SetEffectivePerf(blockProb[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Resort the chains based on effective performance
|
||||||
|
sort.Slice(chains, func(i, j int) bool {
|
||||||
|
return chains[i].BeforeEffective(chains[j])
|
||||||
|
})
|
||||||
|
|
||||||
|
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
||||||
|
// subject to the residual gas limit
|
||||||
|
// When a chain is merged in, all its previous dependent chains *must* also be
|
||||||
|
// merged in or we'll have a broken block
|
||||||
|
startMerge := time.Now()
|
||||||
|
last := len(chains)
|
||||||
|
for i, chain := range chains {
|
||||||
|
// did we run out of performing chains?
|
||||||
|
if chain.gasPerf < 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// has it already been merged?
|
||||||
|
if chain.merged {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
|
chainGasLimit := chain.gasLimit
|
||||||
|
var chainDeps []*msgChain
|
||||||
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
|
chainDeps = append(chainDeps, curChain)
|
||||||
|
chainGasLimit += curChain.gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// does it all fit in the block?
|
||||||
|
if chainGasLimit <= gasLimit {
|
||||||
|
// include it together with all dependencies
|
||||||
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
|
curChain := chainDeps[i]
|
||||||
|
curChain.merged = true
|
||||||
|
result = append(result, curChain.msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
chain.merged = true
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
gasLimit -= chainGasLimit
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can't fit this chain and its dependencies because of block gasLimit -- we are
|
||||||
|
// at the edge
|
||||||
|
last = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Infow("merge message chains done", "took", time.Since(startMerge))
|
||||||
|
|
||||||
|
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
||||||
|
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
||||||
|
// Trimming invalidaates subsequent dependent chains so that they can't be selected
|
||||||
|
// as their dependency cannot be (fully) included.
|
||||||
|
// We do this in a loop because the blocker might have been inordinately large and
|
||||||
|
// we might have to do it multiple times to satisfy tail packing
|
||||||
|
startTail := time.Now()
|
||||||
|
tailLoop:
|
||||||
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
|
// trim if necessary
|
||||||
|
if chains[last].gasLimit > gasLimit {
|
||||||
|
chains[last].Trim(gasLimit, mp, baseFee, ts, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// push down if it hasn't been invalidated
|
||||||
|
if chains[last].valid {
|
||||||
|
for i := last; i < len(chains)-1; i++ {
|
||||||
|
if chains[i].BeforeEffective(chains[i+1]) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
chains[i], chains[i+1] = chains[i+1], chains[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// select the next (valid and fitting) chain and its dependencies for inclusion
|
||||||
|
for i, chain := range chains[last:] {
|
||||||
|
// has the chain been invalidated?
|
||||||
|
if !chain.valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// has it already been merged?
|
||||||
|
if chain.merged {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
|
if chain.gasPerf < 0 {
|
||||||
|
break tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
|
chainGasLimit := chain.gasLimit
|
||||||
|
depGasLimit := int64(0)
|
||||||
|
var chainDeps []*msgChain
|
||||||
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
|
chainDeps = append(chainDeps, curChain)
|
||||||
|
chainGasLimit += curChain.gasLimit
|
||||||
|
depGasLimit += curChain.gasLimit
|
||||||
|
}
|
||||||
|
|
||||||
|
// does it all fit in the bock
|
||||||
|
if chainGasLimit <= gasLimit {
|
||||||
|
// include it together with all dependencies
|
||||||
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
|
curChain := chainDeps[i]
|
||||||
|
curChain.merged = true
|
||||||
|
result = append(result, curChain.msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
chain.merged = true
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
gasLimit -= chainGasLimit
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||||
|
// making a decision about trimming or invalidating.
|
||||||
|
// if the dependencies exceed the gas limit, then we must invalidate the chain
|
||||||
|
// as it can never be included.
|
||||||
|
// Otherwise we can just trim and continue
|
||||||
|
if depGasLimit > gasLimit {
|
||||||
|
chain.Invalidate()
|
||||||
|
last += i + 1
|
||||||
|
continue tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// dependencies fit, just trim it
|
||||||
|
chain.Trim(gasLimit-depGasLimit, mp, baseFee, ts, false)
|
||||||
|
last += i
|
||||||
|
continue tailLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// the merge loop ended after processing all the chains and we we probably have still
|
||||||
|
// gas to spare; end the loop.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
log.Infow("pack tail chains done", "took", time.Since(startTail))
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) blockProbabilities(tq float64) []float64 {
|
||||||
|
// TODO FIXME fit in the actual probability distribution
|
||||||
|
// this just makes a dummy random distribution for testing purposes
|
||||||
|
bps := make([]float64, MaxBlocks)
|
||||||
|
norm := 0.0
|
||||||
|
for i := 0; i < MaxBlocks; i++ {
|
||||||
|
p := rand.Float64()
|
||||||
|
bps[i] = p
|
||||||
|
norm += p
|
||||||
|
}
|
||||||
|
// normalize to make it a distribution
|
||||||
|
for i := 0; i < MaxBlocks; i++ {
|
||||||
|
bps[i] /= norm
|
||||||
|
}
|
||||||
|
|
||||||
|
return bps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.SignedMessage, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
|
||||||
@ -95,18 +353,18 @@ func (mp *MessagePool) selectMessages(curTs, ts *types.TipSet) ([]*types.SignedM
|
|||||||
startMerge := time.Now()
|
startMerge := time.Now()
|
||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
// does it fit in the block?
|
|
||||||
if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 {
|
|
||||||
gasLimit -= chain.gasLimit
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// did we run out of performing chains?
|
// did we run out of performing chains?
|
||||||
if chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// does it fit in the block?
|
||||||
|
if chain.gasLimit <= gasLimit {
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// we can't fit this chain because of block gasLimit -- we are at the edge
|
// we can't fit this chain because of block gasLimit -- we are at the edge
|
||||||
last = i
|
last = i
|
||||||
break
|
break
|
||||||
@ -137,30 +395,31 @@ tailLoop:
|
|||||||
|
|
||||||
// select the next (valid and fitting) chain for inclusion
|
// select the next (valid and fitting) chain for inclusion
|
||||||
for i, chain := range chains[last:] {
|
for i, chain := range chains[last:] {
|
||||||
// has the chain been invalidated
|
// has the chain been invalidated?
|
||||||
if !chain.valid {
|
if !chain.valid {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// does it fit in the bock?
|
|
||||||
if chain.gasLimit <= gasLimit && chain.gasPerf >= 0 {
|
|
||||||
gasLimit -= chain.gasLimit
|
|
||||||
result = append(result, chain.msgs...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// if gasPerf < 0 we have no more profitable chains
|
// if gasPerf < 0 we have no more profitable chains
|
||||||
if chain.gasPerf < 0 {
|
if chain.gasPerf < 0 {
|
||||||
break tailLoop
|
break tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// does it fit in the bock?
|
||||||
|
if chain.gasLimit <= gasLimit {
|
||||||
|
gasLimit -= chain.gasLimit
|
||||||
|
result = append(result, chain.msgs...)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// this chain needs to be trimmed
|
// this chain needs to be trimmed
|
||||||
last += i
|
last += i
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
// the merge loop ended after processing all the chains and we probably still have gas to spare
|
// the merge loop ended after processing all the chains and we probably still have
|
||||||
// -- mark the end.
|
// gas to spare; end the loop
|
||||||
last = len(chains)
|
break
|
||||||
}
|
}
|
||||||
log.Infow("pack tail chains done", "took", time.Since(startTail))
|
log.Infow("pack tail chains done", "took", time.Since(startTail))
|
||||||
|
|
||||||
@ -533,6 +792,10 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
chains[i].next = chains[i+1]
|
chains[i].next = chains[i+1]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := len(chains) - 1; i > 0; i-- {
|
||||||
|
chains[i].prev = chains[i-1]
|
||||||
|
}
|
||||||
|
|
||||||
return chains
|
return chains
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -563,16 +826,26 @@ func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if mc.next != nil {
|
if mc.next != nil {
|
||||||
mc.next.invalidate()
|
mc.next.Invalidate()
|
||||||
mc.next = nil
|
mc.next = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *msgChain) invalidate() {
|
func (mc *msgChain) Invalidate() {
|
||||||
mc.valid = false
|
mc.valid = false
|
||||||
mc.msgs = nil
|
mc.msgs = nil
|
||||||
if mc.next != nil {
|
if mc.next != nil {
|
||||||
mc.next.invalidate()
|
mc.next.Invalidate()
|
||||||
mc.next = nil
|
mc.next = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mc *msgChain) SetEffectivePerf(bp float64) {
|
||||||
|
mc.effPerf = mc.gasPerf * bp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mc *msgChain) BeforeEffective(other *msgChain) bool {
|
||||||
|
return mc.effPerf > other.effPerf ||
|
||||||
|
(mc.effPerf == other.effPerf && mc.gasPerf > other.gasPerf) ||
|
||||||
|
(mc.effPerf == other.effPerf && mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
||||||
|
}
|
||||||
|
@ -403,7 +403,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -471,7 +471,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
|||||||
tma.setStateNonce(a1, 10)
|
tma.setStateNonce(a1, 10)
|
||||||
tma.setStateNonce(a2, 10)
|
tma.setStateNonce(a2, 10)
|
||||||
|
|
||||||
msgs, err = mp.SelectMessages(ts3)
|
msgs, err = mp.SelectMessages(ts3, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -545,7 +545,7 @@ func TestMessageSelectionTrimming(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -609,7 +609,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
|||||||
mustAdd(t, mp, m)
|
mustAdd(t, mp, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs, err := mp.SelectMessages(ts)
|
msgs, err := mp.SelectMessages(ts, 1.0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user