feat(BB): PrepareProposal Chain (#163)

This commit is contained in:
David Terpay 2023-06-05 15:10:36 -04:00 committed by GitHub
parent 1b928d883a
commit 61b6759e92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 229 additions and 141 deletions

View File

@ -1,9 +1,6 @@
package abci
import (
"crypto/sha256"
"encoding/hex"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
@ -16,23 +13,67 @@ type (
// handlers.
ProposalHandler struct {
logger log.Logger
mempool blockbuster.Mempool
txEncoder sdk.TxEncoder
prepareLanesHandler blockbuster.PrepareLanesHandler
processLanesHandler blockbuster.ProcessLanesHandler
}
)
// NewProposalHandler returns a new ProposalHandler.
func NewProposalHandler(logger log.Logger, mempool blockbuster.Mempool, txEncoder sdk.TxEncoder) *ProposalHandler {
// NewProposalHandler returns a new abci++ proposal handler.
func NewProposalHandler(logger log.Logger, mempool blockbuster.Mempool) *ProposalHandler {
return &ProposalHandler{
logger: logger,
mempool: mempool,
txEncoder: txEncoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...),
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
}
}
// ChainProcessLane chains together the proposal verification logic from each lane
// PrepareProposalHandler prepares the proposal by selecting transactions from each lane
// according to each lane's selection logic. We select transactions in a greedy fashion. Note that
// each lane has an boundary on the number of bytes that can be included in the proposal. By default,
// the default lane will not have a boundary on the number of bytes that can be included in the proposal and
// will include all valid transactions in the proposal (up to MaxTxBytes).
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
proposal := h.prepareLanesHandler(ctx, blockbuster.NewProposal(req.MaxTxBytes))
return abci.ResponsePrepareProposal{Txs: proposal.Txs}
}
}
// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal {
if _, err := h.processLanesHandler(ctx, req.Txs); err != nil {
h.logger.Error("failed to validate the proposal", "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}
// ChainPrepareLanes chains together the proposal preparation logic from each lane
// into a single function. The first lane in the chain is the first lane to be prepared and
// the last lane in the chain is the last lane to be prepared.
func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandler {
if len(chain) == 0 {
return nil
}
// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return func(ctx sdk.Context, proposal *blockbuster.Proposal) *blockbuster.Proposal {
return chain[0].PrepareLane(ctx, proposal, ChainPrepareLanes(chain[1:]...))
}
}
// ChainProcessLanes chains together the proposal verification logic from each lane
// into a single function. The first lane in the chain is the first lane to be verified and
// the last lane in the chain is the last lane to be verified.
func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandler {
@ -49,53 +90,3 @@ func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandle
return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...))
}
}
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
var (
selectedTxs = make(map[string][]byte)
totalTxBytes int64
)
for _, l := range h.mempool.Registry() {
if totalTxBytes < req.MaxTxBytes {
laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs)
if err != nil {
h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err)
continue
}
for _, txBz := range laneTxs {
totalTxBytes += int64(len(txBz))
txHash := sha256.Sum256(txBz)
txHashStr := hex.EncodeToString(txHash[:])
selectedTxs[txHashStr] = txBz
}
}
}
proposalTxs := make([][]byte, 0, len(selectedTxs))
for _, txBz := range selectedTxs {
proposalTxs = append(proposalTxs, txBz)
}
return abci.ResponsePrepareProposal{Txs: proposalTxs}
}
}
// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal {
if _, err := h.processLanesHandler(ctx, req.Txs); err != nil {
h.logger.Error("failed to process lanes", "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}

View File

@ -1,12 +1,34 @@
package blockbuster
import (
"crypto/sha256"
"encoding/hex"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
type (
// Proposal defines a block proposal type.
Proposal struct {
// Txs is the list of transactions in the proposal.
Txs [][]byte
// SelectedTxs is a cache of the selected transactions in the proposal.
SelectedTxs map[string]struct{}
// TotalTxBytes is the total number of bytes currently included in the proposal.
TotalTxBytes int64
// MaxTxBytes is the maximum number of bytes that can be included in the proposal.
MaxTxBytes int64
}
// PrepareLanesHandler wraps all of the lanes Prepare function into a single chained
// function. You can think of it like an AnteHandler, but for preparing proposals in the
// context of lanes instead of modules.
PrepareLanesHandler func(ctx sdk.Context, proposal *Proposal) *Proposal
// ProcessLanesHandler wraps all of the lanes Process functions into a single chained
// function. You can think of it like an AnteHandler, but for processing proposals in the
@ -19,6 +41,12 @@ type (
TxEncoder sdk.TxEncoder
TxDecoder sdk.TxDecoder
AnteHandler sdk.AnteHandler
// MaxBlockSpace defines the relative percentage of block space that can be
// used by this lane. NOTE: If this is set to zero, then there is no limit
// on the number of transactions that can be included in the block for this
// lane (up to maxTxBytes as provided by the request). This is useful for the default lane.
MaxBlockSpace sdk.Dec
}
// Lane defines an interface used for block construction
@ -41,7 +69,7 @@ type (
// number of bytes that can be included in the block and the selected transactions
// thus from from previous lane(s) as mapping from their HEX-encoded hash to
// the raw transaction.
PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error)
PrepareLane(ctx sdk.Context, proposal *Proposal, next PrepareLanesHandler) *Proposal
// ProcessLane verifies this lane's portion of a proposed block.
ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next ProcessLanesHandler) (sdk.Context, error)
@ -49,11 +77,35 @@ type (
)
// NewLaneConfig returns a new LaneConfig. This will be embedded in a lane.
func NewBaseLaneConfig(logger log.Logger, txEncoder sdk.TxEncoder, txDecoder sdk.TxDecoder, anteHandler sdk.AnteHandler) BaseLaneConfig {
func NewBaseLaneConfig(logger log.Logger, txEncoder sdk.TxEncoder, txDecoder sdk.TxDecoder, anteHandler sdk.AnteHandler, maxBlockSpace sdk.Dec) BaseLaneConfig {
return BaseLaneConfig{
Logger: logger,
TxEncoder: txEncoder,
TxDecoder: txDecoder,
AnteHandler: anteHandler,
Logger: logger,
TxEncoder: txEncoder,
TxDecoder: txDecoder,
AnteHandler: anteHandler,
MaxBlockSpace: maxBlockSpace,
}
}
func NewProposal(maxTxBytes int64) *Proposal {
return &Proposal{
Txs: make([][]byte, 0),
SelectedTxs: make(map[string]struct{}),
MaxTxBytes: maxTxBytes,
}
}
// UpdateProposal updates the proposal with the given transactions and total size.
func (p *Proposal) UpdateProposal(txs [][]byte, totalSize int64) *Proposal {
p.TotalTxBytes += totalSize
p.Txs = append(p.Txs, txs...)
for _, tx := range txs {
txHash := sha256.Sum256(tx)
txHashStr := hex.EncodeToString(txHash[:])
p.SelectedTxs[txHashStr] = struct{}{}
}
return p
}

View File

@ -12,14 +12,21 @@ import (
// PrepareLane will attempt to select the highest bid transaction that is valid
// and whose bundled transactions are valid and include them in the proposal. It
// will return an empty partial proposal if no valid bids are found.
func (l *TOBLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) {
var tmpSelectedTxs [][]byte
func (l *TOBLane) PrepareLane(ctx sdk.Context, proposal *blockbuster.Proposal, next blockbuster.PrepareLanesHandler) *blockbuster.Proposal {
// Define all of the info we need to select transactions for the partial proposal.
var (
totalSize int64
txs [][]byte
txsToRemove = make(map[sdk.Tx]struct{}, 0)
)
bidTxIterator := l.Select(ctx, nil)
txsToRemove := make(map[sdk.Tx]struct{}, 0)
// Calculate the max tx bytes for the lane and track the total size of the
// transactions we have selected so far.
maxTxBytes := blockbuster.GetMaxTxBytesForLane(proposal, l.cfg.MaxBlockSpace)
// Attempt to select the highest bid transaction that is valid and whose
// bundled transactions are valid.
bidTxIterator := l.Select(ctx, nil)
selectBidTxLoop:
for ; bidTxIterator != nil; bidTxIterator = bidTxIterator.Next() {
cacheCtx, write := ctx.CacheContext()
@ -28,9 +35,10 @@ selectBidTxLoop:
// if the transaction is already in the (partial) block proposal, we skip it.
txHash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, tmpBidTx)
if err != nil {
return nil, fmt.Errorf("failed to get bid tx hash: %w", err)
txsToRemove[tmpBidTx] = struct{}{}
continue
}
if _, ok := selectedTxs[txHash]; ok {
if _, ok := proposal.SelectedTxs[txHash]; ok {
continue selectBidTxLoop
}
@ -73,14 +81,13 @@ selectBidTxLoop:
continue selectBidTxLoop
}
// if the transaction is already in the (partial) block proposal, we skip it.
hash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, sdkTx)
if err != nil {
txsToRemove[tmpBidTx] = struct{}{}
continue selectBidTxLoop
}
// if the transaction is already in the (partial) block proposal, we skip it.
if _, ok := selectedTxs[hash]; ok {
if _, ok := proposal.SelectedTxs[hash]; ok {
continue selectBidTxLoop
}
@ -93,8 +100,9 @@ selectBidTxLoop:
// transactions are valid. So we select the bid transaction along with
// all the bundled transactions. We also mark these transactions as seen and
// update the total size selected thus far.
tmpSelectedTxs = append(tmpSelectedTxs, bidTxBz)
tmpSelectedTxs = append(tmpSelectedTxs, bundledTxBz...)
txs = append(txs, bidTxBz)
txs = append(txs, bundledTxBz...)
totalSize = bidTxSize
// Write the cache context to the original context when we know we have a
// valid top of block bundle.
@ -107,18 +115,20 @@ selectBidTxLoop:
l.cfg.Logger.Info(
"failed to select auction bid tx; tx size is too large",
"tx_size", bidTxSize,
"max_size", maxTxBytes,
"max_size", proposal.MaxTxBytes,
)
}
// remove all invalid transactions from the mempool
for tx := range txsToRemove {
if err := l.Remove(tx); err != nil {
return nil, err
}
// Remove all transactions that were invalid during the creation of the partial proposal.
if err := blockbuster.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil {
l.cfg.Logger.Error("failed to remove txs from mempool", "lane", l.Name(), "err", err)
return proposal
}
return tmpSelectedTxs, nil
// Update the proposal with the selected transactions.
proposal.UpdateProposal(txs, totalSize)
return next(ctx, proposal)
}
// ProcessLane will ensure that block proposals that include transactions from
@ -136,49 +146,50 @@ func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next blockb
return ctx, err
}
bidInfo, err := l.GetAuctionBidInfo(tx)
if err != nil {
return ctx, fmt.Errorf("failed to get auction bid info for tx %w", err)
}
// If the transaction is an auction bid, then we need to ensure that it is
// the first transaction in the block proposal and that the order of
// transactions in the block proposal follows the order of transactions in
// the bid.
if bidInfo != nil {
if l.Match(tx) {
// If the transaction is an auction bid, then we need to ensure that it is
// the first transaction in the block proposal and that the order of
// transactions in the block proposal follows the order of transactions in
// the bid.
if index != 0 {
return ctx, fmt.Errorf("block proposal did not place auction bid transaction at the top of the lane: %d", index)
}
bundledTransactions := bidInfo.Transactions
if len(proposalTxs) < len(bundledTransactions)+1 {
return ctx, errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid")
bidInfo, err := l.GetAuctionBidInfo(tx)
if err != nil {
return ctx, fmt.Errorf("failed to get auction bid info for tx at index %w", err)
}
for i, refTxRaw := range bundledTransactions {
// Wrap and then encode the bundled transaction to ensure that the underlying
// reference transaction can be processed as an sdk.Tx.
wrappedTx, err := l.WrapBundleTransaction(refTxRaw)
if err != nil {
if bidInfo != nil {
if len(proposalTxs) < len(bidInfo.Transactions)+1 {
return ctx, errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid")
}
for i, refTxRaw := range bidInfo.Transactions {
// Wrap and then encode the bundled transaction to ensure that the underlying
// reference transaction can be processed as an sdk.Tx.
wrappedTx, err := l.WrapBundleTransaction(refTxRaw)
if err != nil {
return ctx, err
}
refTxBz, err := l.cfg.TxEncoder(wrappedTx)
if err != nil {
return ctx, err
}
if !bytes.Equal(refTxBz, proposalTxs[i+1]) {
return ctx, errors.New("block proposal does not match the bundled transactions in the auction bid")
}
}
// Verify the bid transaction.
if err = l.VerifyTx(ctx, tx); err != nil {
return ctx, err
}
refTxBz, err := l.cfg.TxEncoder(wrappedTx)
if err != nil {
return ctx, err
}
if !bytes.Equal(refTxBz, proposalTxs[i+1]) {
return ctx, errors.New("block proposal does not match the bundled transactions in the auction bid")
}
endIndex = len(bidInfo.Transactions) + 1
}
// Verify the bid transaction.
if err = l.VerifyTx(ctx, tx); err != nil {
return ctx, err
}
endIndex += len(bundledTransactions) + 1
}
}

View File

@ -40,12 +40,11 @@ func NewTOBLane(
maxTx int,
anteHandler sdk.AnteHandler,
af Factory,
maxBlockSpace sdk.Dec,
) *TOBLane {
logger = logger.With("lane", LaneName)
return &TOBLane{
Mempool: NewMempool(txEncoder, maxTx, af),
cfg: blockbuster.NewBaseLaneConfig(logger, txEncoder, txDecoder, anteHandler),
cfg: blockbuster.NewBaseLaneConfig(logger, txEncoder, txDecoder, anteHandler, maxBlockSpace),
Factory: af,
}
}

View File

@ -8,11 +8,16 @@ import (
)
// PrepareLane will prepare a partial proposal for the base lane.
func (l *DefaultLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) {
func (l *DefaultLane) PrepareLane(ctx sdk.Context, proposal *blockbuster.Proposal, next blockbuster.PrepareLanesHandler) *blockbuster.Proposal {
// Define all of the info we need to select transactions for the partial proposal.
txs := make([][]byte, 0)
txsToRemove := make(map[sdk.Tx]struct{}, 0)
totalSize := int64(0)
// Calculate the max tx bytes for the lane and track the total size of the
// transactions we have selected so far.
maxTxBytes := blockbuster.GetMaxTxBytesForLane(proposal, l.cfg.MaxBlockSpace)
// Select all transactions in the mempool that are valid and not already in the
// partial proposal.
for iterator := l.Mempool.Select(ctx, nil); iterator != nil; iterator = iterator.Next() {
@ -30,11 +35,11 @@ func (l *DefaultLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs
txsToRemove[tx] = struct{}{}
continue
}
if _, ok := selectedTxs[hash]; ok {
if _, ok := proposal.SelectedTxs[hash]; ok {
continue
}
// If the transaction is too large, we skip it.
// If the transaction is too large, we break and do not attempt to include more txs.
txSize := int64(len(txBytes))
if updatedSize := totalSize + txSize; updatedSize > maxTxBytes {
break
@ -52,10 +57,13 @@ func (l *DefaultLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs
// Remove all transactions that were invalid during the creation of the partial proposal.
if err := blockbuster.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil {
return nil, fmt.Errorf("failed to remove txs from mempool for lane %s: %w", l.Name(), err)
l.cfg.Logger.Error("failed to remove txs from mempool", "lane", l.Name(), "err", err)
return proposal
}
return txs, nil
proposal.UpdateProposal(txs, totalSize)
return next(ctx, proposal)
}
// ProcessLane verifies the default lane's portion of a block proposal.

View File

@ -23,11 +23,10 @@ type DefaultLane struct {
cfg blockbuster.BaseLaneConfig
}
// NewDefaultLane returns a new default lane.
func NewDefaultLane(logger log.Logger, txDecoder sdk.TxDecoder, txEncoder sdk.TxEncoder, anteHandler sdk.AnteHandler) *DefaultLane {
func NewDefaultLane(logger log.Logger, txDecoder sdk.TxDecoder, txEncoder sdk.TxEncoder, anteHandler sdk.AnteHandler, maxBlockSpace sdk.Dec) *DefaultLane {
return &DefaultLane{
Mempool: NewDefaultMempool(txEncoder),
cfg: blockbuster.NewBaseLaneConfig(logger, txEncoder, txDecoder, anteHandler),
cfg: blockbuster.NewBaseLaneConfig(logger, txEncoder, txDecoder, anteHandler, maxBlockSpace),
}
}

View File

@ -34,8 +34,8 @@ type Terminator struct{}
var _ blockbuster.Lane = (*Terminator)(nil)
// PrepareLane is a no-op
func (t Terminator) PrepareLane(_ sdk.Context, _ int64, _ map[string][]byte) ([][]byte, error) {
return nil, nil
func (t Terminator) PrepareLane(_ sdk.Context, proposal *blockbuster.Proposal, _ blockbuster.PrepareLanesHandler) *blockbuster.Proposal {
return proposal
}
// ProcessLane is a no-op

View File

@ -2,7 +2,6 @@ package blockbuster
import (
"context"
"errors"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
@ -11,11 +10,15 @@ import (
var _ Mempool = (*BBMempool)(nil)
type (
// Mempool defines the Blockbuster mempool interface.
Mempool interface {
sdkmempool.Mempool
// Registry returns the mempool's lane registry.
Registry() []Lane
// Contains returns true if the transaction is contained in the mempool.
Contains(tx sdk.Tx) (bool, error)
}
// Mempool defines the Blockbuster mempool implement. It contains a registry
@ -52,16 +55,13 @@ func (m *BBMempool) CountTx() int {
// Insert inserts a transaction into every lane that it matches. Insertion will
// be attempted on all lanes, even if an error is encountered.
func (m *BBMempool) Insert(ctx context.Context, tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
if lane.Match(tx) {
err := lane.Insert(ctx, tx)
errs = append(errs, err)
return lane.Insert(ctx, tx)
}
}
return errors.Join(errs...)
return nil
}
// Insert returns a nil iterator.
@ -77,16 +77,24 @@ func (m *BBMempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator {
// Remove removes a transaction from every lane that it matches. Removal will be
// attempted on all lanes, even if an error is encountered.
func (m *BBMempool) Remove(tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
if lane.Match(tx) {
err := lane.Remove(tx)
errs = append(errs, err)
return lane.Remove(tx)
}
}
return errors.Join(errs...)
return nil
}
// Contains returns true if the transaction is contained in the mempool.
func (m *BBMempool) Contains(tx sdk.Tx) (bool, error) {
for _, lane := range m.registry {
if lane.Match(tx) {
return lane.Contains(tx)
}
}
return false, nil
}
// Registry returns the mempool's lane registry.

View File

@ -32,3 +32,23 @@ func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) erro
return nil
}
// GetMaxTxBytesForLane returns the maximum number of bytes that can be included in the proposal
// for the given lane.
func GetMaxTxBytesForLane(proposal *Proposal, ratio sdk.Dec) int64 {
// In the case where the ratio is zero, we return the max tx bytes remaining. Note, the only
// lane that should have a ratio of zero is the default lane. This means the default lane
// will have no limit on the number of transactions it can include in a block and is only
// limited by the maxTxBytes included in the PrepareProposalRequest.
if ratio.IsZero() {
remainder := proposal.MaxTxBytes - proposal.TotalTxBytes
if remainder < 0 {
return 0
}
return remainder
}
// Otherwise, we calculate the max tx bytes for the lane based on the ratio.
return ratio.MulInt64(proposal.MaxTxBytes).TruncateInt().Int64()
}