From d83d585327045620a27f82c9a6c7ae83a67e863c Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Wed, 8 Sep 2021 15:22:41 -0400 Subject: [PATCH] Mempool: Selection logic should respect block message limits --- chain/messagepool/selection.go | 77 +++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index acff7c4cf..88b1b63af 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -92,7 +92,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? - if gasLimit < minGas { + if gasLimit < minGas || len(result) >= build.BlockMessageLimit { return result, nil } @@ -117,19 +117,21 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ return result, nil } - // 3. Parition chains into blocks (without trimming) + // 3. Partition chains into blocks (without trimming) // we use the full blockGasLimit (as opposed to the residual gas limit from the - // priority message selection) as we have to account for what other miners are doing + // priority message selection) as we have to account for what other block providers are doing nextChain := 0 partitions := make([][]*msgChain, MaxBlocks) for i := 0; i < MaxBlocks && nextChain < len(chains); i++ { gasLimit := int64(build.BlockGasLimit) + msgLimit := build.BlockMessageLimit for nextChain < len(chains) { chain := chains[nextChain] nextChain++ partitions[i] = append(partitions[i], chain) gasLimit -= chain.gasLimit - if gasLimit < minGas { + msgLimit -= len(chain.msgs) + if gasLimit < minGas || msgLimit <= 0 { break } } @@ -158,7 +160,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ }) // 6. Merge the head chains to produce the list of messages selected for inclusion - // subject to the residual gas limit + // subject to the residual block limits // When a chain is merged in, all its previous dependent chains *must* also be // merged in or we'll have a broken block startMerge := time.Now() @@ -176,14 +178,16 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ // compute the dependencies that must be merged and the gas limit including deps chainGasLimit := chain.gasLimit + chainMsgLimit := len(chain.msgs) var chainDeps []*msgChain for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { chainDeps = append(chainDeps, curChain) chainGasLimit += curChain.gasLimit + chainMsgLimit += len(curChain.msgs) } // does it all fit in the block? - if chainGasLimit <= gasLimit { + if chainGasLimit <= gasLimit && chainMsgLimit+len(result) <= build.BlockMessageLimit { // include it together with all dependencies for i := len(chainDeps) - 1; i >= 0; i-- { curChain := chainDeps[i] @@ -192,7 +196,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ } chain.merged = true - // adjust the effective pefromance for all subsequent chains + // adjust the effective performance for all subsequent chains if next := chain.next; next != nil && next.effPerf > 0 { next.effPerf += next.parentOffset for next = next.next; next != nil && next.effPerf > 0; next = next.next { @@ -222,7 +226,7 @@ func (mp *MessagePool) selectMessagesOptimal(ctx context.Context, curTs, ts *typ // 7. We have reached the edge of what can fit wholesale; if we still hae available // gasLimit to pack some more chains, then trim the last chain and push it down. - // Trimming invalidaates subsequent dependent chains so that they can't be selected + // Trimming invalidates subsequent dependent chains so that they can't be selected // as their dependency cannot be (fully) included. // We do this in a loop because the blocker might have been inordinately large and // we might have to do it multiple times to satisfy tail packing @@ -231,7 +235,7 @@ tailLoop: for gasLimit >= minGas && last < len(chains) { // trim if necessary if chains[last].gasLimit > gasLimit { - chains[last].Trim(gasLimit, mp, baseFee) + chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee) } // push down if it hasn't been invalidated @@ -263,16 +267,20 @@ tailLoop: // compute the dependencies that must be merged and the gas limit including deps chainGasLimit := chain.gasLimit + chainMsgLimit := len(chain.msgs) depGasLimit := int64(0) + depMsgLimit := 0 var chainDeps []*msgChain for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { chainDeps = append(chainDeps, curChain) chainGasLimit += curChain.gasLimit + chainMsgLimit += len(curChain.msgs) depGasLimit += curChain.gasLimit + depMsgLimit += len(curChain.msgs) } // does it all fit in the bock - if chainGasLimit <= gasLimit { + if chainGasLimit <= gasLimit && len(result)+chainMsgLimit <= build.BlockMessageLimit { // include it together with all dependencies for i := len(chainDeps) - 1; i >= 0; i-- { curChain := chainDeps[i] @@ -288,17 +296,17 @@ tailLoop: // it doesn't all fit; now we have to take into account the dependent chains before // making a decision about trimming or invalidating. - // if the dependencies exceed the gas limit, then we must invalidate the chain + // if the dependencies exceed the block limits, then we must invalidate the chain // as it can never be included. // Otherwise we can just trim and continue - if depGasLimit > gasLimit { + if depGasLimit > gasLimit || len(result)+depMsgLimit >= build.BlockMessageLimit { chain.Invalidate() last += i + 1 continue tailLoop } // dependencies fit, just trim it - chain.Trim(gasLimit-depGasLimit, mp, baseFee) + chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee) last += i continue tailLoop } @@ -311,9 +319,9 @@ tailLoop: log.Infow("pack tail chains done", "took", dt) } - // if we have gasLimit to spare, pick some random (non-negative) chains to fill the block - // we pick randomly so that we minimize the probability of duplication among all miners - if gasLimit >= minGas { + // if we have room to spare, pick some random (non-negative) chains to fill the block + // we pick randomly so that we minimize the probability of duplication among all block producers + if gasLimit >= minGas && len(result) <= build.BlockMessageLimit { randomCount := 0 startRandom := time.Now() @@ -321,7 +329,7 @@ tailLoop: for _, chain := range chains { // have we filled the block - if gasLimit < minGas { + if gasLimit < minGas || len(result) >= build.BlockMessageLimit { break } @@ -337,23 +345,27 @@ tailLoop: // compute the dependencies that must be merged and the gas limit including deps chainGasLimit := chain.gasLimit + chainMsgLimit := len(chain.msgs) depGasLimit := int64(0) + depMsgLimit := 0 var chainDeps []*msgChain for curChain := chain.prev; curChain != nil && !curChain.merged; curChain = curChain.prev { chainDeps = append(chainDeps, curChain) chainGasLimit += curChain.gasLimit + chainMsgLimit += len(curChain.msgs) depGasLimit += curChain.gasLimit + depMsgLimit += len(curChain.msgs) } // do the deps fit? if the deps won't fit, invalidate the chain - if depGasLimit > gasLimit { + if depGasLimit > gasLimit || len(result)+depMsgLimit > build.BlockMessageLimit { chain.Invalidate() continue } // do they fit as is? if it doesn't, trim to make it fit if possible - if chainGasLimit > gasLimit { - chain.Trim(gasLimit-depGasLimit, mp, baseFee) + if chainGasLimit > gasLimit || len(result)+chainMsgLimit > build.BlockMessageLimit { + chain.Trim(gasLimit-depGasLimit, build.BlockMessageLimit-len(result)-depMsgLimit, mp, baseFee) if !chain.valid { continue @@ -416,7 +428,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type result, gasLimit := mp.selectPriorityMessages(ctx, pending, baseFee, ts) // have we filled the block? - if gasLimit < minGas { + if gasLimit < minGas || len(result) > build.BlockMessageLimit { return result, nil } @@ -442,7 +454,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type } // 3. Merge the head chains to produce the list of messages selected for inclusion, subject to - // the block gas limit. + // the block gas and message limits. startMerge := time.Now() last := len(chains) for i, chain := range chains { @@ -452,13 +464,13 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type } // does it fit in the block? - if chain.gasLimit <= gasLimit { + if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit { gasLimit -= chain.gasLimit result = append(result, chain.msgs...) continue } - // we can't fit this chain because of block gasLimit -- we are at the edge + // we can't fit this chain because of block limits -- we are at the edge last = i break } @@ -476,7 +488,7 @@ func (mp *MessagePool) selectMessagesGreedy(ctx context.Context, curTs, ts *type tailLoop: for gasLimit >= minGas && last < len(chains) { // trim - chains[last].Trim(gasLimit, mp, baseFee) + chains[last].Trim(gasLimit, build.BlockMessageLimit-len(result), mp, baseFee) // push down if it hasn't been invalidated if chains[last].valid { @@ -501,7 +513,7 @@ tailLoop: } // does it fit in the bock? - if chain.gasLimit <= gasLimit { + if chain.gasLimit <= gasLimit && len(result)+len(chain.msgs) <= build.BlockMessageLimit { gasLimit -= chain.gasLimit result = append(result, chain.msgs...) continue @@ -778,11 +790,16 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 return nil } + // if we have more messages from this sender than can fit in a block, drop the extra ones + if len(msgs) > build.BlockMessageLimit { + msgs = msgs[:build.BlockMessageLimit] + } + // ok, now we can construct the chains using the messages we have // invariant: each chain has a bigger gasPerf than the next -- otherwise they can be merged // and increase the gasPerf of the first chain // We do this in two passes: - // - in the first pass we create chains that aggreagate messages with non-decreasing gasPerf + // - in the first pass we create chains that aggregate messages with non-decreasing gasPerf // - in the second pass we merge chains to maintain the invariant. var chains []*msgChain var curChain *msgChain @@ -808,7 +825,7 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 gasLimit := curChain.gasLimit + m.Message.GasLimit gasPerf := mp.getGasPerf(gasReward, gasLimit) - // try to add the message to the current chain -- if it decreases the gasPerf, then make a + // try to add the message to the current chain -- if it decreases the gasPerf, or then make a // new chain if gasPerf < curChain.gasPerf { chains = append(chains, curChain) @@ -868,9 +885,9 @@ func (mc *msgChain) Before(other *msgChain) bool { (mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0) } -func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) { +func (mc *msgChain) Trim(gasLimit int64, msgLimit int, mp *MessagePool, baseFee types.BigInt) { i := len(mc.msgs) - 1 - for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) { + for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0 || i >= msgLimit) { gasReward := mp.getGasReward(mc.msgs[i], baseFee) mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward) mc.gasLimit -= mc.msgs[i].Message.GasLimit