Mempool: Selection logic should respect block message limits
This commit is contained in:
parent
7e3fea2489
commit
d83d585327
@ -92,7 +92,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas {
|
if gasLimit < minGas || len(result) >= build.BlockMessageLimit {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,19 +117,21 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Parition chains into blocks (without trimming)
|
// 3. Partition chains into blocks (without trimming)
|
||||||
// we use the full blockGasLimit (as opposed to the residual gas limit from the
|
// we use the full blockGasLimit (as opposed to the residual gas limit from the
|
||||||
// priority message selection) as we have to account for what other miners are doing
|
// priority message selection) as we have to account for what other block providers are doing
|
||||||
nextChain := 0
|
nextChain := 0
|
||||||
partitions := make([][]*msgChain, MaxBlocks)
|
partitions := make([][]*msgChain, MaxBlocks)
|
||||||
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
for i := 0; i < MaxBlocks && nextChain < len(chains); i++ {
|
||||||
gasLimit := int64(build.BlockGasLimit)
|
gasLimit := int64(build.BlockGasLimit)
|
||||||
|
msgLimit := build.BlockMessageLimit
|
||||||
for nextChain < len(chains) {
|
for nextChain < len(chains) {
|
||||||
chain := chains[nextChain]
|
chain := chains[nextChain]
|
||||||
nextChain++
|
nextChain++
|
||||||
partitions[i] = append(partitions[i], chain)
|
partitions[i] = append(partitions[i], chain)
|
||||||
gasLimit -= chain.gasLimit
|
gasLimit -= chain.gasLimit
|
||||||
if gasLimit < minGas {
|
msgLimit -= len(chain.msgs)
|
||||||
|
if gasLimit < minGas || msgLimit <= 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -158,7 +160,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
})
|
})
|
||||||
|
|
||||||
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
// 6. Merge the head chains to produce the list of messages selected for inclusion
|
||||||
// subject to the residual gas limit
|
// subject to the residual block limits
|
||||||
// When a chain is merged in, all its previous dependent chains *must* also be
|
// When a chain is merged in, all its previous dependent chains *must* also be
|
||||||
// merged in or we'll have a broken block
|
// merged in or we'll have a broken block
|
||||||
startMerge := time.Now()
|
startMerge := time.Now()
|
||||||
@ -176,14 +178,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
chainGasLimit := chain.gasLimit
|
chainGasLimit := chain.gasLimit
|
||||||
|
chainMsgLimit := len(chain.msgs)
|
||||||
var chainDeps []*msgChain
|
var chainDeps []*msgChain
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
chainDeps = append(chainDeps, curChain)
|
chainDeps = append(chainDeps, curChain)
|
||||||
chainGasLimit += curChain.gasLimit
|
chainGasLimit += curChain.gasLimit
|
||||||
|
chainMsgLimit += len(curChain.msgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// does it all fit in the block?
|
// does it all fit in the block?
|
||||||
if chainGasLimit <= gasLimit {
|
if chainGasLimit <= gasLimit && chainMsgLimit+len(result) <= build.BlockMessageLimit {
|
||||||
// include it together with all dependencies
|
// include it together with all dependencies
|
||||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
curChain := chainDeps[i]
|
curChain := chainDeps[i]
|
||||||
@ -192,7 +196,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
}
|
}
|
||||||
|
|
||||||
chain.merged = true
|
chain.merged = true
|
||||||
// adjust the effective pefromance for all subsequent chains
|
// adjust the effective performance for all subsequent chains
|
||||||
if next := chain.next; next != nil && next.effPerf > 0 {
|
if next := chain.next; next != nil && next.effPerf > 0 {
|
||||||
next.effPerf += next.parentOffset
|
next.effPerf += next.parentOffset
|
||||||
for next = next.next; next != nil && next.effPerf > 0; next = next.next {
|
for next = next.next; next != nil && next.effPerf > 0; next = next.next {
|
||||||
@ -222,7 +226,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ
|
|||||||
|
|
||||||
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
// 7. We have reached the edge of what can fit wholesale; if we still hae available
|
||||||
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
// gasLimit to pack some more chains, then trim the last chain and push it down.
|
||||||
// Trimming invalidaates subsequent dependent chains so that they can't be selected
|
// Trimming invalidates subsequent dependent chains so that they can't be selected
|
||||||
// as their dependency cannot be (fully) included.
|
// as their dependency cannot be (fully) included.
|
||||||
// We do this in a loop because the blocker might have been inordinately large and
|
// We do this in a loop because the blocker might have been inordinately large and
|
||||||
// we might have to do it multiple times to satisfy tail packing
|
// we might have to do it multiple times to satisfy tail packing
|
||||||
@ -231,7 +235,7 @@ tailLoop:
|
|||||||
for gasLimit >= minGas && last < len(chains) {
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
// trim if necessary
|
// trim if necessary
|
||||||
if chains[last].gasLimit > gasLimit {
|
if chains[last].gasLimit > gasLimit {
|
||||||
chains[last].Trim(gasLimit, mp, baseFee)
|
chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee)
|
||||||
}
|
}
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
@ -263,16 +267,20 @@ tailLoop:
|
|||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
chainGasLimit := chain.gasLimit
|
chainGasLimit := chain.gasLimit
|
||||||
|
chainMsgLimit := len(chain.msgs)
|
||||||
depGasLimit := int64(0)
|
depGasLimit := int64(0)
|
||||||
|
depMsgLimit := 0
|
||||||
var chainDeps []*msgChain
|
var chainDeps []*msgChain
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
chainDeps = append(chainDeps, curChain)
|
chainDeps = append(chainDeps, curChain)
|
||||||
chainGasLimit += curChain.gasLimit
|
chainGasLimit += curChain.gasLimit
|
||||||
|
chainMsgLimit += len(curChain.msgs)
|
||||||
depGasLimit += curChain.gasLimit
|
depGasLimit += curChain.gasLimit
|
||||||
|
depMsgLimit += len(curChain.msgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// does it all fit in the bock
|
// does it all fit in the bock
|
||||||
if chainGasLimit <= gasLimit {
|
if chainGasLimit <= gasLimit && len(result)+chainMsgLimit <= build.BlockMessageLimit {
|
||||||
// include it together with all dependencies
|
// include it together with all dependencies
|
||||||
for i := len(chainDeps) - 1; i >= 0; i-- {
|
for i := len(chainDeps) - 1; i >= 0; i-- {
|
||||||
curChain := chainDeps[i]
|
curChain := chainDeps[i]
|
||||||
@ -288,17 +296,17 @@ tailLoop:
|
|||||||
|
|
||||||
// it doesn't all fit; now we have to take into account the dependent chains before
|
// it doesn't all fit; now we have to take into account the dependent chains before
|
||||||
// making a decision about trimming or invalidating.
|
// making a decision about trimming or invalidating.
|
||||||
// if the dependencies exceed the gas limit, then we must invalidate the chain
|
// if the dependencies exceed the block limits, then we must invalidate the chain
|
||||||
// as it can never be included.
|
// as it can never be included.
|
||||||
// Otherwise we can just trim and continue
|
// Otherwise we can just trim and continue
|
||||||
if depGasLimit > gasLimit {
|
if depGasLimit > gasLimit || len(result)+depMsgLimit >= build.BlockMessageLimit {
|
||||||
chain.Invalidate()
|
chain.Invalidate()
|
||||||
last += i + 1
|
last += i + 1
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
// dependencies fit, just trim it
|
// dependencies fit, just trim it
|
||||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee)
|
||||||
last += i
|
last += i
|
||||||
continue tailLoop
|
continue tailLoop
|
||||||
}
|
}
|
||||||
@ -311,9 +319,9 @@ tailLoop:
|
|||||||
log.Infow("pack tail chains done", "took", dt)
|
log.Infow("pack tail chains done", "took", dt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we have gasLimit to spare, pick some random (non-negative) chains to fill the block
|
// if we have room to spare, pick some random (non-negative) chains to fill the block
|
||||||
// we pick randomly so that we minimize the probability of duplication among all miners
|
// we pick randomly so that we minimize the probability of duplication among all block producers
|
||||||
if gasLimit >= minGas {
|
if gasLimit >= minGas && len(result) <= build.BlockMessageLimit {
|
||||||
randomCount := 0
|
randomCount := 0
|
||||||
|
|
||||||
startRandom := time.Now()
|
startRandom := time.Now()
|
||||||
@ -321,7 +329,7 @@ tailLoop:
|
|||||||
|
|
||||||
for _, chain := range chains {
|
for _, chain := range chains {
|
||||||
// have we filled the block
|
// have we filled the block
|
||||||
if gasLimit < minGas {
|
if gasLimit < minGas || len(result) >= build.BlockMessageLimit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,23 +345,27 @@ tailLoop:
|
|||||||
|
|
||||||
// compute the dependencies that must be merged and the gas limit including deps
|
// compute the dependencies that must be merged and the gas limit including deps
|
||||||
chainGasLimit := chain.gasLimit
|
chainGasLimit := chain.gasLimit
|
||||||
|
chainMsgLimit := len(chain.msgs)
|
||||||
depGasLimit := int64(0)
|
depGasLimit := int64(0)
|
||||||
|
depMsgLimit := 0
|
||||||
var chainDeps []*msgChain
|
var chainDeps []*msgChain
|
||||||
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev {
|
||||||
chainDeps = append(chainDeps, curChain)
|
chainDeps = append(chainDeps, curChain)
|
||||||
chainGasLimit += curChain.gasLimit
|
chainGasLimit += curChain.gasLimit
|
||||||
|
chainMsgLimit += len(curChain.msgs)
|
||||||
depGasLimit += curChain.gasLimit
|
depGasLimit += curChain.gasLimit
|
||||||
|
depMsgLimit += len(curChain.msgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// do the deps fit? if the deps won't fit, invalidate the chain
|
// do the deps fit? if the deps won't fit, invalidate the chain
|
||||||
if depGasLimit > gasLimit {
|
if depGasLimit > gasLimit || len(result)+depMsgLimit > build.BlockMessageLimit {
|
||||||
chain.Invalidate()
|
chain.Invalidate()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// do they fit as is? if it doesn't, trim to make it fit if possible
|
// do they fit as is? if it doesn't, trim to make it fit if possible
|
||||||
if chainGasLimit > gasLimit {
|
if chainGasLimit > gasLimit || len(result)+chainMsgLimit > build.BlockMessageLimit {
|
||||||
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
|
chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee)
|
||||||
|
|
||||||
if !chain.valid {
|
if !chain.valid {
|
||||||
continue
|
continue
|
||||||
@ -416,7 +428,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts)
|
||||||
|
|
||||||
// have we filled the block?
|
// have we filled the block?
|
||||||
if gasLimit < minGas {
|
if gasLimit < minGas || len(result) > build.BlockMessageLimit {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -442,7 +454,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
|
// 3. Merge the head chains to produce the list of messages selected for inclusion, subject to
|
||||||
// the block gas limit.
|
// the block gas and message limits.
|
||||||
startMerge := time.Now()
|
startMerge := time.Now()
|
||||||
last := len(chains)
|
last := len(chains)
|
||||||
for i, chain := range chains {
|
for i, chain := range chains {
|
||||||
@ -452,13 +464,13 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
}
|
}
|
||||||
|
|
||||||
// does it fit in the block?
|
// does it fit in the block?
|
||||||
if chain.gasLimit <= gasLimit {
|
if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit {
|
||||||
gasLimit -= chain.gasLimit
|
gasLimit -= chain.gasLimit
|
||||||
result = append(result, chain.msgs...)
|
result = append(result, chain.msgs...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can't fit this chain because of block gasLimit -- we are at the edge
|
// we can't fit this chain because of block limits -- we are at the edge
|
||||||
last = i
|
last = i
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -476,7 +488,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type
|
|||||||
tailLoop:
|
tailLoop:
|
||||||
for gasLimit >= minGas && last < len(chains) {
|
for gasLimit >= minGas && last < len(chains) {
|
||||||
// trim
|
// trim
|
||||||
chains[last].Trim(gasLimit, mp, baseFee)
|
chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee)
|
||||||
|
|
||||||
// push down if it hasn't been invalidated
|
// push down if it hasn't been invalidated
|
||||||
if chains[last].valid {
|
if chains[last].valid {
|
||||||
@ -501,7 +513,7 @@ tailLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// does it fit in the bock?
|
// does it fit in the bock?
|
||||||
if chain.gasLimit <= gasLimit {
|
if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit {
|
||||||
gasLimit -= chain.gasLimit
|
gasLimit -= chain.gasLimit
|
||||||
result = append(result, chain.msgs...)
|
result = append(result, chain.msgs...)
|
||||||
continue
|
continue
|
||||||
@ -778,11 +790,16 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we have more messages from this sender than can fit in a block, drop the extra ones
|
||||||
|
if len(msgs) > build.BlockMessageLimit {
|
||||||
|
msgs = msgs[:build.BlockMessageLimit]
|
||||||
|
}
|
||||||
|
|
||||||
// ok, now we can construct the chains using the messages we have
|
// ok, now we can construct the chains using the messages we have
|
||||||
// invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged
|
// invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged
|
||||||
// and increase the gasPerf of the first chain
|
// and increase the gasPerf of the first chain
|
||||||
// We do this in two passes:
|
// We do this in two passes:
|
||||||
// - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf
|
// - in the first pass we create chains that aggregate messages with non-decreasing gasPerf
|
||||||
// - in the second pass we merge chains to maintain the invariant.
|
// - in the second pass we merge chains to maintain the invariant.
|
||||||
var chains []*msgChain
|
var chains []*msgChain
|
||||||
var curChain *msgChain
|
var curChain *msgChain
|
||||||
@ -808,7 +825,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
|
|||||||
gasLimit := curChain.gasLimit + m.Message.GasLimit
|
gasLimit := curChain.gasLimit + m.Message.GasLimit
|
||||||
gasPerf := mp.getGasPerf(gasReward, gasLimit)
|
gasPerf := mp.getGasPerf(gasReward, gasLimit)
|
||||||
|
|
||||||
// try to add the message to the current chain -- if it decreases the gasPerf, then make a
|
// try to add the message to the current chain -- if it decreases the gasPerf, or then make a
|
||||||
// new chain
|
// new chain
|
||||||
if gasPerf < curChain.gasPerf {
|
if gasPerf < curChain.gasPerf {
|
||||||
chains = append(chains, curChain)
|
chains = append(chains, curChain)
|
||||||
@ -868,9 +885,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
|
|||||||
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) {
|
func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee types.BigInt) {
|
||||||
i := len(mc.msgs) - 1
|
i := len(mc.msgs) - 1
|
||||||
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
|
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0 || i >= msgLimit) {
|
||||||
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
|
||||||
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
|
||||||
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
mc.gasLimit -= mc.msgs[i].Message.GasLimit
|
||||||
|
Loading…
Reference in New Issue
Block a user