From cee39a9eb7edb7c32dedf09663814f4884dda15a Mon Sep 17 00:00:00 2001 From: David Terpay <35130517+davidterpay@users.noreply.github.com> Date: Fri, 9 Jun 2023 10:50:49 -0400 Subject: [PATCH] feat(BB): Better proposal, better docs, better logging, papa johns (#172) --- abci/abci_test.go | 4 +- abci/auction_test.go | 12 +- abci/proposal_auction.go | 28 ++-- abci/proposals.go | 22 ++- abci/proposals_test.go | 25 ++-- blockbuster/abci/abci.go | 44 ++++-- blockbuster/abci/abci_test.go | 10 +- blockbuster/lane.go | 45 +----- blockbuster/lanes/auction/abci.go | 27 ++-- blockbuster/lanes/base/abci.go | 31 +++-- blockbuster/lanes/base/lane.go | 8 +- blockbuster/lanes/base/mempool.go | 2 + blockbuster/lanes/terminator/lane.go | 4 +- blockbuster/mempool.go | 46 ++++++- blockbuster/proposals.go | 197 +++++++++++++++++++++++++++ blockbuster/utils/utils.go | 7 +- blockbuster/utils/utils_test.go | 57 +++----- 17 files changed, 404 insertions(+), 165 deletions(-) create mode 100644 blockbuster/proposals.go diff --git a/abci/abci_test.go b/abci/abci_test.go index 2d698b2..8bd404d 100644 --- a/abci/abci_test.go +++ b/abci/abci_test.go @@ -272,11 +272,11 @@ func (suite *ABCITestSuite) createExtendedCommitInfoFromTxBzs(txs [][]byte) []by return commitInfoBz } -func (suite *ABCITestSuite) createAuctionInfoFromTxBzs(txs [][]byte, numTxs uint64) []byte { +func (suite *ABCITestSuite) createAuctionInfoFromTxBzs(txs [][]byte, numTxs uint64, maxTxBytes int64) []byte { auctionInfo := abci.AuctionInfo{ ExtendedCommitInfo: suite.createExtendedCommitInfoFromTxBzs(txs), NumTxs: numTxs, - MaxTxBytes: int64(len(txs[0])), + MaxTxBytes: maxTxBytes, } auctionInfoBz, err := auctionInfo.Marshal() diff --git a/abci/auction_test.go b/abci/auction_test.go index c76bc85..e5ad50e 100644 --- a/abci/auction_test.go +++ b/abci/auction_test.go @@ -481,11 +481,11 @@ func (suite *ABCITestSuite) TestBuildTOB() { proposal := suite.proposalHandler.BuildTOB(suite.ctx, commitInfo, tc.maxBytes) // Size of the proposal should be less than or equal to the max bytes - suite.Require().LessOrEqual(proposal.TotalTxBytes, tc.maxBytes) + suite.Require().LessOrEqual(proposal.GetTotalTxBytes(), tc.maxBytes) if winningBid == nil { - suite.Require().Len(proposal.Txs, 0) - suite.Require().Equal(proposal.TotalTxBytes, int64(0)) + suite.Require().Len(proposal.GetTxs(), 0) + suite.Require().Equal(proposal.GetTotalTxBytes(), int64(0)) } else { // Get info about the winning bid winningBidBz, err := suite.encodingConfig.TxConfig.TxEncoder()(winningBid) @@ -496,13 +496,13 @@ func (suite *ABCITestSuite) TestBuildTOB() { // Verify that the size of the proposal is the size of the winning bid // plus the size of the bundle - suite.Require().Equal(len(proposal.Txs), len(auctionBidInfo.Transactions)+1) + suite.Require().Equal(len(proposal.GetTxs()), len(auctionBidInfo.Transactions)+1) // Verify that the winning bid is the first transaction in the proposal - suite.Require().Equal(proposal.Txs[0], winningBidBz) + suite.Require().Equal(proposal.GetTxs()[0], winningBidBz) // Verify the ordering of transactions in the proposal - for index, tx := range proposal.Txs[1:] { + for index, tx := range proposal.GetTxs()[1:] { suite.Equal(tx, auctionBidInfo.Transactions[index]) } } diff --git a/abci/proposal_auction.go b/abci/proposal_auction.go index 629e378..9517a9a 100644 --- a/abci/proposal_auction.go +++ b/abci/proposal_auction.go @@ -91,7 +91,7 @@ func (h *ProposalHandler) VerifyTOB(ctx sdk.Context, proposalTxs [][]byte) (*Auc // Verify that the top of block txs matches the top of block proposal txs. actualTOBTxs := proposalTxs[NumInjectedTxs : auctionInfo.NumTxs+NumInjectedTxs] - if !reflect.DeepEqual(actualTOBTxs, expectedTOB.Txs) { + if !reflect.DeepEqual(actualTOBTxs, expectedTOB.GetTxs()) { return nil, fmt.Errorf("expected top of block txs does not match top of block proposal") } @@ -140,16 +140,17 @@ func (h *ProposalHandler) buildTOB(ctx sdk.Context, bidTx sdk.Tx, maxBytes int64 proposal := blockbuster.NewProposal(maxBytes) // cache the bytes of the bid transaction - txBz, hash, err := utils.GetTxHashStr(h.txEncoder, bidTx) + txBz, _, err := utils.GetTxHashStr(h.txEncoder, bidTx) if err != nil { return proposal, err } - proposal.Cache[hash] = struct{}{} - proposal.TotalTxBytes = int64(len(txBz)) - proposal.Txs = append(proposal.Txs, txBz) - - if int64(len(txBz)) > maxBytes { + maxBytesForLane := utils.GetMaxTxBytesForLane( + proposal.GetMaxTxBytes(), + proposal.GetTotalTxBytes(), + h.tobLane.GetMaxBlockSpace(), + ) + if int64(len(txBz)) > maxBytesForLane { return proposal, fmt.Errorf("bid transaction is too large; got %d, max %d", len(txBz), maxBytes) } @@ -164,10 +165,10 @@ func (h *ProposalHandler) buildTOB(ctx sdk.Context, bidTx sdk.Tx, maxBytes int64 } // store the bytes of each ref tx as sdk.Tx bytes in order to build a valid proposal - sdkTxBytes := make([][]byte, len(bidInfo.Transactions)) + txs := [][]byte{txBz} // Ensure that the bundled transactions are valid - for index, rawRefTx := range bidInfo.Transactions { + for _, rawRefTx := range bidInfo.Transactions { // convert the bundled raw transaction to a sdk.Tx refTx, err := h.tobLane.WrapBundleTransaction(rawRefTx) if err != nil { @@ -175,17 +176,18 @@ func (h *ProposalHandler) buildTOB(ctx sdk.Context, bidTx sdk.Tx, maxBytes int64 } // convert the sdk.Tx to a hash and bytes - txBz, hash, err := utils.GetTxHashStr(h.txEncoder, refTx) + txBz, _, err := utils.GetTxHashStr(h.txEncoder, refTx) if err != nil { return proposal, err } - proposal.Cache[hash] = struct{}{} - sdkTxBytes[index] = txBz + txs = append(txs, txBz) } // Add the bundled transactions to the proposal. - proposal.Txs = append(proposal.Txs, sdkTxBytes...) + if err := proposal.UpdateProposal(h.tobLane, txs); err != nil { + return proposal, err + } return proposal, nil } diff --git a/abci/proposals.go b/abci/proposals.go index 99487af..4716b41 100644 --- a/abci/proposals.go +++ b/abci/proposals.go @@ -42,6 +42,16 @@ type ( // ProcessLaneBasic is utilized to verify the rest of the proposal according to // the preferences of the top of block lane. ProcessLaneBasic(txs []sdk.Tx) error + + // GetMaxBlockSpace returns the maximum block space that can be used by the top of + // block lane as a percentage of the total block space. + GetMaxBlockSpace() sdk.Dec + + // Logger returns the logger for the top of block lane. + Logger() log.Logger + + // Name returns the name of the top of block lane. + Name() string } // ProposalHandler contains the functionality and handlers required to\ @@ -94,7 +104,7 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { auctionInfo := &AuctionInfo{ ExtendedCommitInfo: lastCommitInfo, MaxTxBytes: req.MaxTxBytes, - NumTxs: uint64(len(topOfBlock.Txs)), + NumTxs: uint64(topOfBlock.GetNumTxs()), } // Add the auction info and top of block transactions into the proposal. @@ -104,13 +114,17 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { return cometabci.ResponsePrepareProposal{Txs: nil} } - topOfBlock.Txs = append([][]byte{auctionInfoBz}, topOfBlock.Txs...) + topOfBlock.AddVoteExtension(auctionInfoBz) // Prepare the proposal by selecting transactions from each lane according to // each lane's selection logic. - proposal := h.prepareLanesHandler(ctx, topOfBlock) + proposal, err := h.prepareLanesHandler(ctx, topOfBlock) + if err != nil { + h.logger.Error("failed to prepare proposal", "err", err) + return cometabci.ResponsePrepareProposal{Txs: nil} + } - return cometabci.ResponsePrepareProposal{Txs: proposal.Txs} + return cometabci.ResponsePrepareProposal{Txs: proposal.GetProposal()} } } diff --git a/abci/proposals_test.go b/abci/proposals_test.go index f84e764..b059ea6 100644 --- a/abci/proposals_test.go +++ b/abci/proposals_test.go @@ -13,6 +13,8 @@ import ( buildertypes "github.com/skip-mev/pob/x/builder/types" ) +// TODO: +// - Add tests that can that trigger a panic for the tob of block lane func (suite *ABCITestSuite) TestPrepareProposal() { var ( // the modified transactions cannot exceed this size @@ -355,12 +357,15 @@ func (suite *ABCITestSuite) TestPrepareProposal() { } } +// TODO: +// - Add tests that ensure that the top of block lane does not propose more transactions than it is allowed to func (suite *ABCITestSuite) TestProcessProposal() { var ( // auction configuration maxBundleSize uint32 = 10 reserveFee = sdk.NewCoin("foo", sdk.NewInt(1000)) frontRunningProtection = true + maxTxBytes int64 = 1000000000000000000 // mempool configuration proposal [][]byte @@ -448,7 +453,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { normalTx, err := testutils.CreateRandomTxBz(suite.encodingConfig.TxConfig, account, nonce, numberMsgs, timeout) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTx}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTx}, 2, maxTxBytes) proposal = [][]byte{ auctionInfo, @@ -470,7 +475,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -496,7 +501,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 3) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 3, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -521,7 +526,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 3) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 3, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) proposal = append( @@ -555,7 +560,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz2, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 3) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 3, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -590,7 +595,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz2, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 2, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz2) @@ -625,7 +630,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz2, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz, bidTxBz2}, 2, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz2) bidInfo2 := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -655,7 +660,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -683,7 +688,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, 2, maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) @@ -717,7 +722,7 @@ func (suite *ABCITestSuite) TestProcessProposal() { bidTxBz, err := testutils.CreateAuctionTxWithSignerBz(suite.encodingConfig.TxConfig, bidder, bid, nonce, timeout, signers) suite.Require().NoError(err) - auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, uint64(len(signers)+1)) + auctionInfo := suite.createAuctionInfoFromTxBzs([][]byte{bidTxBz}, uint64(len(signers)+1), maxTxBytes) bidInfo := suite.getAuctionBidInfoFromTxBz(bidTxBz) diff --git a/blockbuster/abci/abci.go b/blockbuster/abci/abci.go index c90212e..a9f0624 100644 --- a/blockbuster/abci/abci.go +++ b/blockbuster/abci/abci.go @@ -45,13 +45,21 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { } }() - proposal := h.prepareLanesHandler(ctx, blockbuster.NewProposal(req.MaxTxBytes)) - - resp = abci.ResponsePrepareProposal{ - Txs: proposal.Txs, + proposal, err := h.prepareLanesHandler(ctx, blockbuster.NewProposal(req.MaxTxBytes)) + if err != nil { + h.logger.Error("failed to prepare proposal", "err", err) + return abci.ResponsePrepareProposal{Txs: make([][]byte, 0)} } - return + h.logger.Info( + "prepared proposal", + "num_txs", proposal.GetNumTxs(), + "total_tx_bytes", proposal.GetTotalTxBytes(), + ) + + return abci.ResponsePrepareProposal{ + Txs: proposal.GetProposal(), + } } } @@ -69,8 +77,14 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { } }() + txs := req.Txs + if len(txs) == 0 { + h.logger.Info("accepted empty proposal") + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } + // Decode the transactions from the proposal. - decodedTxs, err := utils.GetDecodedTxs(h.txDecoder, req.Txs) + decodedTxs, err := utils.GetDecodedTxs(h.txDecoder, txs) if err != nil { h.logger.Error("failed to decode transactions", "err", err) return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} @@ -82,6 +96,8 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} } + h.logger.Info("validated proposal", "num_txs", len(txs)) + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} } } @@ -102,7 +118,7 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle chain = append(chain, terminator.Terminator{}) } - return func(ctx sdk.Context, partialProposal *blockbuster.Proposal) (finalProposal *blockbuster.Proposal) { + return func(ctx sdk.Context, partialProposal blockbuster.BlockProposal) (finalProposal blockbuster.BlockProposal, err error) { lane := chain[0] lane.Logger().Info("preparing lane", "lane", lane.Name()) @@ -110,8 +126,8 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle cacheCtx, write := ctx.CacheContext() defer func() { - if err := recover(); err != nil { - lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err) + if rec := recover(); rec != nil || err != nil { + lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec) lanesRemaining := len(chain) switch { @@ -119,18 +135,19 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle // If there are only two lanes remaining, then the first lane in the chain // is the lane that failed to prepare the partial proposal and the second lane in the // chain is the terminator lane. We return the proposal as is. - finalProposal = partialProposal + finalProposal, err = partialProposal, nil default: // If there are more than two lanes remaining, then the first lane in the chain // is the lane that failed to prepare the proposal but the second lane in the // chain is not the terminator lane so there could potentially be more transactions // added to the proposal maxTxBytesForLane := utils.GetMaxTxBytesForLane( - partialProposal, + partialProposal.GetMaxTxBytes(), + partialProposal.GetTotalTxBytes(), chain[1].GetMaxBlockSpace(), ) - finalProposal = chain[1].PrepareLane( + finalProposal, err = chain[1].PrepareLane( ctx, partialProposal, maxTxBytesForLane, @@ -146,7 +163,8 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle // Get the maximum number of bytes that can be included in the proposal for this lane. maxTxBytesForLane := utils.GetMaxTxBytesForLane( - partialProposal, + partialProposal.GetMaxTxBytes(), + partialProposal.GetTotalTxBytes(), lane.GetMaxBlockSpace(), ) diff --git a/blockbuster/abci/abci_test.go b/blockbuster/abci/abci_test.go index e4bf6f2..8218887 100644 --- a/blockbuster/abci/abci_test.go +++ b/blockbuster/abci/abci_test.go @@ -26,8 +26,7 @@ import ( type ABCITestSuite struct { suite.Suite - logger log.Logger - ctx sdk.Context + ctx sdk.Context // Define basic tx configuration encodingConfig testutils.EncodingConfig @@ -333,8 +332,9 @@ func (suite *ABCITestSuite) TestPrepareProposal() { suite.Require().NoError(err) freeSize := int64(len(freeTxBytes)) - maxTxBytes = tobSize + freeSize - 1 + maxTxBytes = tobSize*2 + freeSize - 1 suite.tobConfig.MaxBlockSpace = sdk.ZeroDec() + suite.freeConfig.MaxBlockSpace = sdk.MustNewDecFromStr("0.1") txs = []sdk.Tx{freeTx} auctionTxs = []sdk.Tx{bidTx} @@ -388,7 +388,7 @@ func (suite *ABCITestSuite) TestPrepareProposal() { suite.Require().NoError(err) normalSize := int64(len(normalTxBytes)) - maxTxBytes = tobSize + freeSize + normalSize + 1 + maxTxBytes = tobSize*2 + freeSize + normalSize + 1 // Tob can take up as much space as it wants suite.tobConfig.MaxBlockSpace = sdk.ZeroDec() @@ -693,7 +693,7 @@ func (suite *ABCITestSuite) TestPrepareProposal() { } // Create a new proposal handler - suite.proposalHandler = abci.NewProposalHandler(suite.logger, suite.encodingConfig.TxConfig.TxDecoder(), suite.mempool) + suite.proposalHandler = abci.NewProposalHandler(log.NewNopLogger(), suite.encodingConfig.TxConfig.TxDecoder(), suite.mempool) handler := suite.proposalHandler.PrepareProposalHandler() res := handler(suite.ctx, abcitypes.RequestPrepareProposal{ MaxTxBytes: maxTxBytes, diff --git a/blockbuster/lane.go b/blockbuster/lane.go index f0f8c8c..820db75 100644 --- a/blockbuster/lane.go +++ b/blockbuster/lane.go @@ -1,8 +1,6 @@ package blockbuster import ( - "crypto/sha256" - "encoding/hex" "fmt" "github.com/cometbft/cometbft/libs/log" @@ -11,25 +9,10 @@ import ( ) type ( - // Proposal defines a block proposal type. - Proposal struct { - // Txs is the list of transactions in the proposal. - Txs [][]byte - - // Cache is a cache of the selected transactions in the proposal. - Cache map[string]struct{} - - // TotalTxBytes is the total number of bytes currently included in the proposal. - TotalTxBytes int64 - - // MaxTxBytes is the maximum number of bytes that can be included in the proposal. - MaxTxBytes int64 - } - // PrepareLanesHandler wraps all of the lanes Prepare function into a single chained // function. You can think of it like an AnteHandler, but for preparing proposals in the // context of lanes instead of modules. - PrepareLanesHandler func(ctx sdk.Context, proposal *Proposal) *Proposal + PrepareLanesHandler func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error) // ProcessLanesHandler wraps all of the lanes Process functions into a single chained // function. You can think of it like an AnteHandler, but for processing proposals in the @@ -78,7 +61,7 @@ type ( // included in the proposal for the given lane, the partial proposal, and a function // to call the next lane in the chain. The next lane in the chain will be called with // the updated proposal and context. - PrepareLane(ctx sdk.Context, proposal *Proposal, maxTxBytes int64, next PrepareLanesHandler) *Proposal + PrepareLane(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64, next PrepareLanesHandler) (BlockProposal, error) // ProcessLaneBasic validates that transactions belonging to this lane are not misplaced // in the block proposal. @@ -131,27 +114,3 @@ func (c *BaseLaneConfig) ValidateBasic() error { return nil } - -// NewProposal returns a new empty proposal. -func NewProposal(maxTxBytes int64) *Proposal { - return &Proposal{ - Txs: make([][]byte, 0), - Cache: make(map[string]struct{}), - MaxTxBytes: maxTxBytes, - } -} - -// UpdateProposal updates the proposal with the given transactions and total size. -func (p *Proposal) UpdateProposal(txs [][]byte, totalSize int64) *Proposal { - p.TotalTxBytes += totalSize - p.Txs = append(p.Txs, txs...) - - for _, tx := range txs { - txHash := sha256.Sum256(tx) - txHashStr := hex.EncodeToString(txHash[:]) - - p.Cache[txHashStr] = struct{}{} - } - - return p -} diff --git a/blockbuster/lanes/auction/abci.go b/blockbuster/lanes/auction/abci.go index 2237d79..fc876c3 100644 --- a/blockbuster/lanes/auction/abci.go +++ b/blockbuster/lanes/auction/abci.go @@ -14,13 +14,12 @@ import ( // will return an empty partial proposal if no valid bids are found. func (l *TOBLane) PrepareLane( ctx sdk.Context, - proposal *blockbuster.Proposal, + proposal blockbuster.BlockProposal, maxTxBytes int64, next blockbuster.PrepareLanesHandler, -) *blockbuster.Proposal { +) (blockbuster.BlockProposal, error) { // Define all of the info we need to select transactions for the partial proposal. var ( - totalSize int64 txs [][]byte txsToRemove = make(map[sdk.Tx]struct{}, 0) ) @@ -33,14 +32,14 @@ selectBidTxLoop: cacheCtx, write := ctx.CacheContext() tmpBidTx := bidTxIterator.Tx() - bidTxBz, txHash, err := utils.GetTxHashStr(l.Cfg.TxEncoder, tmpBidTx) + bidTxBz, _, err := utils.GetTxHashStr(l.Cfg.TxEncoder, tmpBidTx) if err != nil { txsToRemove[tmpBidTx] = struct{}{} - continue + continue selectBidTxLoop } // if the transaction is already in the (partial) block proposal, we skip it. - if _, ok := proposal.Cache[txHash]; ok { + if proposal.Contains(bidTxBz) { continue selectBidTxLoop } @@ -71,14 +70,14 @@ selectBidTxLoop: continue selectBidTxLoop } - sdkTxBz, hash, err := utils.GetTxHashStr(l.Cfg.TxEncoder, sdkTx) + sdkTxBz, _, err := utils.GetTxHashStr(l.Cfg.TxEncoder, sdkTx) if err != nil { txsToRemove[tmpBidTx] = struct{}{} continue selectBidTxLoop } // if the transaction is already in the (partial) block proposal, we skip it. - if _, ok := proposal.Cache[hash]; ok { + if proposal.Contains(sdkTxBz) { continue selectBidTxLoop } @@ -93,7 +92,6 @@ selectBidTxLoop: // update the total size selected thus far. txs = append(txs, bidTxBz) txs = append(txs, bundledTxBz...) - totalSize = bidTxSize // Write the cache context to the original context when we know we have a // valid top of block bundle. @@ -111,12 +109,15 @@ selectBidTxLoop: // Remove all transactions that were invalid during the creation of the partial proposal. if err := utils.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil { - l.Cfg.Logger.Error("failed to remove txs from mempool", "lane", l.Name(), "err", err) - return proposal + return proposal, err } - // Update the proposal with the selected transactions. - proposal.UpdateProposal(txs, totalSize) + // Update the proposal with the selected transactions. This will only return an error + // if the invarient checks are not passed. In the case when this errors, the original proposal + // will be returned (without the selected transactions from this lane). + if err := proposal.UpdateProposal(l, txs); err != nil { + return proposal, err + } return next(ctx, proposal) } diff --git a/blockbuster/lanes/base/abci.go b/blockbuster/lanes/base/abci.go index cd62d3a..08a6db2 100644 --- a/blockbuster/lanes/base/abci.go +++ b/blockbuster/lanes/base/abci.go @@ -8,13 +8,15 @@ import ( "github.com/skip-mev/pob/blockbuster/utils" ) -// PrepareLane will prepare a partial proposal for the base lane. +// PrepareLane will prepare a partial proposal for the default lane. It will select and include +// all valid transactions in the mempool that are not already in the partial proposal. +// The default lane orders transactions by the sdk.Context priority. func (l *DefaultLane) PrepareLane( ctx sdk.Context, - proposal *blockbuster.Proposal, + proposal blockbuster.BlockProposal, maxTxBytes int64, next blockbuster.PrepareLanesHandler, -) *blockbuster.Proposal { +) (blockbuster.BlockProposal, error) { // Define all of the info we need to select transactions for the partial proposal. var ( totalSize int64 @@ -27,14 +29,14 @@ func (l *DefaultLane) PrepareLane( for iterator := l.Mempool.Select(ctx, nil); iterator != nil; iterator = iterator.Next() { tx := iterator.Tx() - txBytes, hash, err := utils.GetTxHashStr(l.Cfg.TxEncoder, tx) + txBytes, _, err := utils.GetTxHashStr(l.Cfg.TxEncoder, tx) if err != nil { txsToRemove[tx] = struct{}{} continue } // if the transaction is already in the (partial) block proposal, we skip it. - if _, ok := proposal.Cache[hash]; ok { + if proposal.Contains(txBytes) { continue } @@ -56,16 +58,22 @@ func (l *DefaultLane) PrepareLane( // Remove all transactions that were invalid during the creation of the partial proposal. if err := utils.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil { - l.Cfg.Logger.Error("failed to remove txs from mempool", "lane", l.Name(), "err", err) - return proposal + return proposal, err } - proposal.UpdateProposal(txs, totalSize) + // Update the partial proposal with the selected transactions. If the proposal is unable to + // be updated, we return an error. The proposal will only be modified if it passes all + // of the invarient checks. + if err := proposal.UpdateProposal(l, txs); err != nil { + return proposal, err + } return next(ctx, proposal) } -// ProcessLane verifies the default lane's portion of a block proposal. +// ProcessLane verifies the default lane's portion of a block proposal. Since the default lane's +// ProcessLaneBasic function ensures that all of the default transactions are in the correct order, +// we only need to verify the contiguous set of transactions that match to the default lane. func (l *DefaultLane) ProcessLane(ctx sdk.Context, txs []sdk.Tx, next blockbuster.ProcessLanesHandler) (sdk.Context, error) { for index, tx := range txs { if l.Match(tx) { @@ -81,8 +89,9 @@ func (l *DefaultLane) ProcessLane(ctx sdk.Context, txs []sdk.Tx, next blockbuste return ctx, nil } -// ProcessLaneBasic does basic validation on the block proposal to ensure that -// transactions that belong to this lane are not misplaced in the block proposal. +// transactions that belong to this lane are not misplaced in the block proposal i.e. +// the proposal only contains contiguous transactions that belong to this lane - there +// can be no interleaving of transactions from other lanes. func (l *DefaultLane) ProcessLaneBasic(txs []sdk.Tx) error { seenOtherLaneTx := false lastSeenIndex := 0 diff --git a/blockbuster/lanes/base/lane.go b/blockbuster/lanes/base/lane.go index 5a849df..1129b31 100644 --- a/blockbuster/lanes/base/lane.go +++ b/blockbuster/lanes/base/lane.go @@ -13,8 +13,12 @@ const ( var _ blockbuster.Lane = (*DefaultLane)(nil) -// DefaultLane defines a default lane implementation. It contains a priority-nonce -// index along with core lane functionality. +// DefaultLane defines a default lane implementation. The default lane orders +// transactions by the sdk.Context priority. The default lane will accept any +// transaction that is not a part of the lane's IgnoreList. By default, the IgnoreList +// is empty and the default lane will accept any transaction. The default lane on its +// own implements the same functionality as the pre v0.47.0 tendermint mempool and proposal +// handlers. type DefaultLane struct { // Mempool defines the mempool for the lane. Mempool diff --git a/blockbuster/lanes/base/mempool.go b/blockbuster/lanes/base/mempool.go index d491f4e..e2ca98f 100644 --- a/blockbuster/lanes/base/mempool.go +++ b/blockbuster/lanes/base/mempool.go @@ -40,6 +40,8 @@ type ( } ) +// NewDefaultMempool returns a new default mempool instance. The default mempool +// orders transactions by the sdk.Context priority. func NewDefaultMempool(txEncoder sdk.TxEncoder) *DefaultMempool { return &DefaultMempool{ index: blockbuster.NewPriorityMempool( diff --git a/blockbuster/lanes/terminator/lane.go b/blockbuster/lanes/terminator/lane.go index b9ddfca..8eae9b4 100644 --- a/blockbuster/lanes/terminator/lane.go +++ b/blockbuster/lanes/terminator/lane.go @@ -35,8 +35,8 @@ type Terminator struct{} var _ blockbuster.Lane = (*Terminator)(nil) // PrepareLane is a no-op -func (t Terminator) PrepareLane(_ sdk.Context, proposal *blockbuster.Proposal, _ int64, _ blockbuster.PrepareLanesHandler) *blockbuster.Proposal { - return proposal +func (t Terminator) PrepareLane(_ sdk.Context, proposal blockbuster.BlockProposal, _ int64, _ blockbuster.PrepareLanesHandler) (blockbuster.BlockProposal, error) { + return proposal, nil } // ProcessLane is a no-op diff --git a/blockbuster/mempool.go b/blockbuster/mempool.go index 9e4c316..969cfc9 100644 --- a/blockbuster/mempool.go +++ b/blockbuster/mempool.go @@ -38,13 +38,28 @@ type ( } ) +// NewMempool returns a new Blockbuster mempool. The blockbuster mempool is +// comprised of a registry of lanes. Each lane is responsible for selecting +// transactions according to its own selection logic. The lanes are ordered +// according to their priority. The first lane in the registry has the highest +// priority. Proposals are verified according to the order of the lanes in the +// registry. Basic mempool API, such as insertion, removal, and contains, are +// delegated to the first lane that matches the transaction. Each transaction +// should only belong in one lane. func NewMempool(lanes ...Lane) *BBMempool { - return &BBMempool{ + mempool := &BBMempool{ registry: lanes, } + + if err := mempool.ValidateBasic(); err != nil { + panic(err) + } + + return mempool } -// CountTx returns the total number of transactions in the mempool. +// CountTx returns the total number of transactions in the mempool. This will +// be the sum of the number of transactions in each lane. func (m *BBMempool) CountTx() int { var total int for _, lane := range m.registry { @@ -125,6 +140,33 @@ func (m *BBMempool) Registry() []Lane { return m.registry } +// ValidateBasic validates the mempools configuration. +func (m *BBMempool) ValidateBasic() error { + sum := sdk.ZeroDec() + seenZeroMaxBlockSpace := false + + for _, lane := range m.registry { + maxBlockSpace := lane.GetMaxBlockSpace() + if maxBlockSpace.IsZero() { + seenZeroMaxBlockSpace = true + } + + sum = sum.Add(lane.GetMaxBlockSpace()) + } + + switch { + // Ensure that the sum of the lane max block space percentages is less than + // or equal to 1. + case sum.GT(sdk.OneDec()): + return fmt.Errorf("sum of lane max block space percentages must be less than or equal to 1, got %s", sum) + // Ensure that there is no unused block space. + case sum.LT(sdk.OneDec()) && !seenZeroMaxBlockSpace: + return fmt.Errorf("sum of total block space percentages will be less than 1") + } + + return nil +} + // GetLane returns the lane with the given name. func (m *BBMempool) GetLane(name string) (Lane, error) { for _, lane := range m.registry { diff --git a/blockbuster/proposals.go b/blockbuster/proposals.go new file mode 100644 index 0000000..108e907 --- /dev/null +++ b/blockbuster/proposals.go @@ -0,0 +1,197 @@ +package blockbuster + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + + "github.com/cometbft/cometbft/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/skip-mev/pob/blockbuster/utils" +) + +var _ BlockProposal = (*Proposal)(nil) + +type ( + // LaneProposal defines the interface/APIs that are required for the proposal to interact + // with a lane. + LaneProposal interface { + // Logger returns the lane's logger. + Logger() log.Logger + + // GetMaxBlockSpace returns the maximum block space for the lane as a relative percentage. + GetMaxBlockSpace() sdk.Dec + + // Name returns the name of the lane. + Name() string + } + + // BlockProposal is the interface/APIs that are required for proposal creation + interacting with + // and updating proposals. BlockProposals are iteratively updated as each lane prepares its + // partial proposal. Each lane must call UpdateProposal with its partial proposal in PrepareLane. BlockProposals + // can also include vote extensions, which are included at the top of the proposal. + BlockProposal interface { + // UpdateProposal updates the proposal with the given transactions. There are a + // few invarients that are checked: + // 1. The total size of the proposal must be less than the maximum number of bytes allowed. + // 2. The total size of the partial proposal must be less than the maximum number of bytes allowed for + // the lane. + UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte) error + + // GetMaxTxBytes returns the maximum number of bytes that can be included in the proposal. + GetMaxTxBytes() int64 + + // GetTotalTxBytes returns the total number of bytes currently included in the proposal. + GetTotalTxBytes() int64 + + // GetTxs returns the transactions in the proposal. + GetTxs() [][]byte + + // GetNumTxs returns the number of transactions in the proposal. + GetNumTxs() int + + // Contains returns true if the proposal contains the given transaction. + Contains(tx []byte) bool + + // AddVoteExtension adds a vote extension to the proposal. + AddVoteExtension(voteExtension []byte) + + // GetVoteExtensions returns the vote extensions in the proposal. + GetVoteExtensions() [][]byte + + // GetProposal returns all of the transactions in the proposal along with the vote extensions + // at the top of the proposal. + GetProposal() [][]byte + } + + // Proposal defines a block proposal type. + Proposal struct { + // txs is the list of transactions in the proposal. + txs [][]byte + + // voteExtensions is the list of vote extensions in the proposal. + voteExtensions [][]byte + + // cache is a cache of the selected transactions in the proposal. + cache map[string]struct{} + + // totalTxBytes is the total number of bytes currently included in the proposal. + totalTxBytes int64 + + // maxTxBytes is the maximum number of bytes that can be included in the proposal. + maxTxBytes int64 + } +) + +// NewProposal returns a new empty proposal. +func NewProposal(maxTxBytes int64) *Proposal { + return &Proposal{ + txs: make([][]byte, 0), + voteExtensions: make([][]byte, 0), + cache: make(map[string]struct{}), + maxTxBytes: maxTxBytes, + } +} + +// UpdateProposal updates the proposal with the given transactions and total size. There are a +// few invarients that are checked: +// 1. The total size of the proposal must be less than the maximum number of bytes allowed. +// 2. The total size of the partial proposal must be less than the maximum number of bytes allowed for +// the lane. +func (p *Proposal) UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte) error { + if len(partialProposalTxs) == 0 { + return nil + } + + partialProposalSize := int64(0) + for _, tx := range partialProposalTxs { + partialProposalSize += int64(len(tx)) + } + + // Invarient check: Ensure that the lane did not prepare a partial proposal that is too large. + maxTxBytesForLane := utils.GetMaxTxBytesForLane(p.GetMaxTxBytes(), p.GetTotalTxBytes(), lane.GetMaxBlockSpace()) + if partialProposalSize > maxTxBytesForLane { + return fmt.Errorf( + "%s lane prepared a partial proposal that is too large: %d > %d", + lane.Name(), + partialProposalSize, + maxTxBytesForLane, + ) + } + + // Invarient check: Ensure that the lane did not prepare a block proposal that is too large. + updatedSize := p.totalTxBytes + partialProposalSize + if updatedSize > p.maxTxBytes { + return fmt.Errorf( + "lane %s prepared a block proposal that is too large: %d > %d", + lane.Name(), + p.totalTxBytes, + p.maxTxBytes, + ) + } + p.totalTxBytes = updatedSize + + lane.Logger().Info( + "adding transactions to proposal", + "lane", lane.Name(), + "num_txs", len(partialProposalTxs), + "total_tx_bytes", partialProposalSize, + "cumulative_size", updatedSize, + ) + + p.txs = append(p.txs, partialProposalTxs...) + + for _, tx := range partialProposalTxs { + txHash := sha256.Sum256(tx) + txHashStr := hex.EncodeToString(txHash[:]) + + p.cache[txHashStr] = struct{}{} + } + + return nil +} + +// GetProposal returns all of the transactions in the proposal along with the vote extensions +// at the top of the proposal. +func (p *Proposal) GetProposal() [][]byte { + return append(p.voteExtensions, p.txs...) +} + +// AddVoteExtension adds a vote extension to the proposal. +func (p *Proposal) AddVoteExtension(voteExtension []byte) { + p.voteExtensions = append(p.voteExtensions, voteExtension) +} + +// GetVoteExtensions returns the vote extensions in the proposal. +func (p *Proposal) GetVoteExtensions() [][]byte { + return p.voteExtensions +} + +// GetMaxTxBytes returns the maximum number of bytes that can be included in the proposal. +func (p *Proposal) GetMaxTxBytes() int64 { + return p.maxTxBytes +} + +// GetTotalTxBytes returns the total number of bytes currently included in the proposal. +func (p *Proposal) GetTotalTxBytes() int64 { + return p.totalTxBytes +} + +// GetTxs returns the transactions in the proposal. +func (p *Proposal) GetTxs() [][]byte { + return p.txs +} + +// GetNumTxs returns the number of transactions in the proposal. +func (p *Proposal) GetNumTxs() int { + return len(p.txs) +} + +// Contains returns true if the proposal contains the given transaction. +func (p *Proposal) Contains(tx []byte) bool { + txHash := sha256.Sum256(tx) + txHashStr := hex.EncodeToString(txHash[:]) + + _, ok := p.cache[txHashStr] + return ok +} diff --git a/blockbuster/utils/utils.go b/blockbuster/utils/utils.go index ce65f8b..938b840 100644 --- a/blockbuster/utils/utils.go +++ b/blockbuster/utils/utils.go @@ -7,7 +7,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" - "github.com/skip-mev/pob/blockbuster" ) // GetTxHashStr returns the hex-encoded hash of the transaction alongside the @@ -52,13 +51,13 @@ func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) erro // GetMaxTxBytesForLane returns the maximum number of bytes that can be included in the proposal // for the given lane. -func GetMaxTxBytesForLane(proposal *blockbuster.Proposal, ratio sdk.Dec) int64 { +func GetMaxTxBytesForLane(maxTxBytes, totalTxBytes int64, ratio sdk.Dec) int64 { // In the case where the ratio is zero, we return the max tx bytes remaining. Note, the only // lane that should have a ratio of zero is the default lane. This means the default lane // will have no limit on the number of transactions it can include in a block and is only // limited by the maxTxBytes included in the PrepareProposalRequest. if ratio.IsZero() { - remainder := proposal.MaxTxBytes - proposal.TotalTxBytes + remainder := maxTxBytes - totalTxBytes if remainder < 0 { return 0 } @@ -67,5 +66,5 @@ func GetMaxTxBytesForLane(proposal *blockbuster.Proposal, ratio sdk.Dec) int64 { } // Otherwise, we calculate the max tx bytes for the lane based on the ratio. - return ratio.MulInt64(proposal.MaxTxBytes).TruncateInt().Int64() + return ratio.MulInt64(maxTxBytes).TruncateInt().Int64() } diff --git a/blockbuster/utils/utils_test.go b/blockbuster/utils/utils_test.go index 62e47a3..f38e1e5 100644 --- a/blockbuster/utils/utils_test.go +++ b/blockbuster/utils/utils_test.go @@ -1,79 +1,66 @@ -package utils +package utils_test import ( "testing" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/skip-mev/pob/blockbuster" + "github.com/skip-mev/pob/blockbuster/utils" ) func TestGetMaxTxBytesForLane(t *testing.T) { testCases := []struct { - name string - proposal *blockbuster.Proposal - ratio sdk.Dec - expected int64 + name string + maxTxBytes int64 + totalTxBytes int64 + ratio sdk.Dec + expected int64 }{ { "ratio is zero", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 50, - }, + 100, + 50, sdk.ZeroDec(), 50, }, { "ratio is zero", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 100, - }, + 100, + 100, sdk.ZeroDec(), 0, }, { "ratio is zero", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 150, - }, + 100, + 150, sdk.ZeroDec(), 0, }, { "ratio is 1", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 50, - }, + 100, + 50, sdk.OneDec(), 100, }, { "ratio is 10%", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 50, - }, + 100, + 50, sdk.MustNewDecFromStr("0.1"), 10, }, { "ratio is 25%", - &blockbuster.Proposal{ - MaxTxBytes: 100, - TotalTxBytes: 50, - }, + 100, + 50, sdk.MustNewDecFromStr("0.25"), 25, }, { "ratio is 50%", - &blockbuster.Proposal{ - MaxTxBytes: 101, - TotalTxBytes: 50, - }, + 101, + 50, sdk.MustNewDecFromStr("0.5"), 50, }, @@ -81,7 +68,7 @@ func TestGetMaxTxBytesForLane(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual := GetMaxTxBytesForLane(tc.proposal, tc.ratio) + actual := utils.GetMaxTxBytesForLane(tc.maxTxBytes, tc.totalTxBytes, tc.ratio) if actual != tc.expected { t.Errorf("expected %d, got %d", tc.expected, actual) }