From 1b928d883ae9d4b4d1c62bbec5b4b8909dc5b37d Mon Sep 17 00:00:00 2001 From: David Terpay <35130517+davidterpay@users.noreply.github.com> Date: Mon, 5 Jun 2023 12:18:06 -0400 Subject: [PATCH] feat(BB): Creating a chain decorator for process lanes (#162) --- .golangci.yml | 1 - blockbuster/abci.go | 70 ------------------- blockbuster/abci/abci.go | 101 +++++++++++++++++++++++++++ blockbuster/lane.go | 10 ++- blockbuster/lanes/auction/abci.go | 36 +++++++--- blockbuster/lanes/base/abci.go | 89 +++++++++++++++++++++-- blockbuster/lanes/terminator/lane.go | 84 ++++++++++++++++++++++ blockbuster/mempool.go | 38 ++++++---- blockbuster/utils.go | 12 ++++ 9 files changed, 339 insertions(+), 102 deletions(-) delete mode 100644 blockbuster/abci.go create mode 100644 blockbuster/abci/abci.go create mode 100644 blockbuster/lanes/terminator/lane.go diff --git a/.golangci.yml b/.golangci.yml index e6deaac..85afc24 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -10,7 +10,6 @@ run: linters: disable-all: true enable: - - depguard - dogsled - exportloopref - goconst diff --git a/blockbuster/abci.go b/blockbuster/abci.go deleted file mode 100644 index 1da17ba..0000000 --- a/blockbuster/abci.go +++ /dev/null @@ -1,70 +0,0 @@ -package blockbuster - -import ( - "crypto/sha256" - "encoding/hex" - - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/libs/log" - sdk "github.com/cosmos/cosmos-sdk/types" -) - -type ProposalHandler struct { - logger log.Logger - mempool Mempool -} - -func NewProposalHandler(logger log.Logger, mempool Mempool) *ProposalHandler { - return &ProposalHandler{ - logger: logger, - mempool: mempool, - } -} - -func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { - return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { - var ( - selectedTxs = make(map[string][]byte) - totalTxBytes int64 - ) - - for _, l := range h.mempool.registry { - if totalTxBytes < req.MaxTxBytes { - laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs) - if err != nil { - h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err) - continue - } - - for _, txBz := range laneTxs { - totalTxBytes += int64(len(txBz)) - - txHash := sha256.Sum256(txBz) - txHashStr := hex.EncodeToString(txHash[:]) - - selectedTxs[txHashStr] = txBz - } - } - } - - proposalTxs := make([][]byte, 0, len(selectedTxs)) - for _, txBz := range selectedTxs { - proposalTxs = append(proposalTxs, txBz) - } - - return abci.ResponsePrepareProposal{Txs: proposalTxs} - } -} - -func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { - return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { - for _, l := range h.mempool.registry { - if err := l.ProcessLane(ctx, req.Txs); err != nil { - h.logger.Error("failed to process lane", "lane", l.Name(), "err", err) - return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} - } - } - - return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} - } -} diff --git a/blockbuster/abci/abci.go b/blockbuster/abci/abci.go new file mode 100644 index 0000000..f2650c2 --- /dev/null +++ b/blockbuster/abci/abci.go @@ -0,0 +1,101 @@ +package abci + +import ( + "crypto/sha256" + "encoding/hex" + + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/skip-mev/pob/blockbuster" + "github.com/skip-mev/pob/blockbuster/lanes/terminator" +) + +type ( + // ProposalHandler is a wrapper around the ABCI++ PrepareProposal and ProcessProposal + // handlers. + ProposalHandler struct { + logger log.Logger + mempool blockbuster.Mempool + txEncoder sdk.TxEncoder + processLanesHandler blockbuster.ProcessLanesHandler + } +) + +// NewProposalHandler returns a new ProposalHandler. +func NewProposalHandler(logger log.Logger, mempool blockbuster.Mempool, txEncoder sdk.TxEncoder) *ProposalHandler { + return &ProposalHandler{ + logger: logger, + mempool: mempool, + txEncoder: txEncoder, + processLanesHandler: ChainProcessLanes(mempool.Registry()...), + } +} + +// ChainProcessLane chains together the proposal verification logic from each lane +// into a single function. The first lane in the chain is the first lane to be verified and +// the last lane in the chain is the last lane to be verified. +func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandler { + if len(chain) == 0 { + return nil + } + + // Handle non-terminated decorators chain + if (chain[len(chain)-1] != terminator.Terminator{}) { + chain = append(chain, terminator.Terminator{}) + } + + return func(ctx sdk.Context, proposalTxs [][]byte) (sdk.Context, error) { + return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...)) + } +} + +func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { + return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + var ( + selectedTxs = make(map[string][]byte) + totalTxBytes int64 + ) + + for _, l := range h.mempool.Registry() { + if totalTxBytes < req.MaxTxBytes { + laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs) + if err != nil { + h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err) + continue + } + + for _, txBz := range laneTxs { + totalTxBytes += int64(len(txBz)) + + txHash := sha256.Sum256(txBz) + txHashStr := hex.EncodeToString(txHash[:]) + + selectedTxs[txHashStr] = txBz + } + } + } + + proposalTxs := make([][]byte, 0, len(selectedTxs)) + for _, txBz := range selectedTxs { + proposalTxs = append(proposalTxs, txBz) + } + + return abci.ResponsePrepareProposal{Txs: proposalTxs} + } +} + +// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal +// according to each lane's verification logic. We verify proposals in a greedy fashion. +// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion +// of the proposal is verified, we pass the remaining transactions to the next lane in the chain. +func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { + return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { + if _, err := h.processLanesHandler(ctx, req.Txs); err != nil { + h.logger.Error("failed to process lanes", "err", err) + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} diff --git a/blockbuster/lane.go b/blockbuster/lane.go index d37eae7..8ffd631 100644 --- a/blockbuster/lane.go +++ b/blockbuster/lane.go @@ -7,6 +7,12 @@ import ( ) type ( + + // ProcessLanesHandler wraps all of the lanes Process functions into a single chained + // function. You can think of it like an AnteHandler, but for processing proposals in the + // context of lanes instead of modules. + ProcessLanesHandler func(ctx sdk.Context, proposalTxs [][]byte) (sdk.Context, error) + // BaseLaneConfig defines the basic functionality needed for a lane. BaseLaneConfig struct { Logger log.Logger @@ -37,8 +43,8 @@ type ( // the raw transaction. PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) - // ProcessLane which verifies the lane's portion of a proposed block. - ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error + // ProcessLane verifies this lane's portion of a proposed block. + ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next ProcessLanesHandler) (sdk.Context, error) } ) diff --git a/blockbuster/lanes/auction/abci.go b/blockbuster/lanes/auction/abci.go index a20900a..1a9e397 100644 --- a/blockbuster/lanes/auction/abci.go +++ b/blockbuster/lanes/auction/abci.go @@ -73,6 +73,17 @@ selectBidTxLoop: continue selectBidTxLoop } + hash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, sdkTx) + if err != nil { + txsToRemove[tmpBidTx] = struct{}{} + continue selectBidTxLoop + } + + // if the transaction is already in the (partial) block proposal, we skip it. + if _, ok := selectedTxs[hash]; ok { + continue selectBidTxLoop + } + bundleTxBz := make([]byte, len(sdkTxBz)) copy(bundleTxBz, sdkTxBz) bundledTxBz[index] = sdkTxBz @@ -115,16 +126,19 @@ selectBidTxLoop: // block proposal is invalid. The block proposal is invalid if it does not // respect the ordering of transactions in the bid transaction or if the bid/bundled // transactions are invalid. -func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error { +func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next blockbuster.ProcessLanesHandler) (sdk.Context, error) { + // Track the index of the first transaction that does not belong to this lane. + endIndex := 0 + for index, txBz := range proposalTxs { tx, err := l.cfg.TxDecoder(txBz) if err != nil { - return err + return ctx, err } bidInfo, err := l.GetAuctionBidInfo(tx) if err != nil { - return err + return ctx, fmt.Errorf("failed to get auction bid info for tx %w", err) } // If the transaction is an auction bid, then we need to ensure that it is @@ -133,12 +147,12 @@ func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error { // the bid. if bidInfo != nil { if index != 0 { - return errors.New("auction bid must be the first transaction in the block proposal") + return ctx, fmt.Errorf("block proposal did not place auction bid transaction at the top of the lane: %d", index) } bundledTransactions := bidInfo.Transactions if len(proposalTxs) < len(bundledTransactions)+1 { - return errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid") + return ctx, errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid") } for i, refTxRaw := range bundledTransactions { @@ -146,27 +160,29 @@ func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error { // reference transaction can be processed as an sdk.Tx. wrappedTx, err := l.WrapBundleTransaction(refTxRaw) if err != nil { - return err + return ctx, err } refTxBz, err := l.cfg.TxEncoder(wrappedTx) if err != nil { - return err + return ctx, err } if !bytes.Equal(refTxBz, proposalTxs[i+1]) { - return errors.New("block proposal does not match the bundled transactions in the auction bid") + return ctx, errors.New("block proposal does not match the bundled transactions in the auction bid") } } // Verify the bid transaction. if err = l.VerifyTx(ctx, tx); err != nil { - return err + return ctx, err } + + endIndex += len(bundledTransactions) + 1 } } - return nil + return next(ctx, proposalTxs[endIndex:]) } // VerifyTx will verify that the bid transaction and all of its bundled diff --git a/blockbuster/lanes/base/abci.go b/blockbuster/lanes/base/abci.go index e23bf7b..bcfe935 100644 --- a/blockbuster/lanes/base/abci.go +++ b/blockbuster/lanes/base/abci.go @@ -1,15 +1,90 @@ package base -import sdk "github.com/cosmos/cosmos-sdk/types" +import ( + "fmt" -func (l *DefaultLane) PrepareLane(sdk.Context, int64, map[string][]byte) ([][]byte, error) { - panic("implement me") + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/skip-mev/pob/blockbuster" +) + +// PrepareLane will prepare a partial proposal for the base lane. +func (l *DefaultLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) { + txs := make([][]byte, 0) + txsToRemove := make(map[sdk.Tx]struct{}, 0) + totalSize := int64(0) + + // Select all transactions in the mempool that are valid and not already in the + // partial proposal. + for iterator := l.Mempool.Select(ctx, nil); iterator != nil; iterator = iterator.Next() { + tx := iterator.Tx() + + txBytes, err := l.cfg.TxEncoder(tx) + if err != nil { + txsToRemove[tx] = struct{}{} + continue + } + + // if the transaction is already in the (partial) block proposal, we skip it. + hash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, tx) + if err != nil { + txsToRemove[tx] = struct{}{} + continue + } + if _, ok := selectedTxs[hash]; ok { + continue + } + + // If the transaction is too large, we skip it. + txSize := int64(len(txBytes)) + if updatedSize := totalSize + txSize; updatedSize > maxTxBytes { + break + } + + // Verify the transaction. + if err := l.VerifyTx(ctx, tx); err != nil { + txsToRemove[tx] = struct{}{} + continue + } + + totalSize += txSize + txs = append(txs, txBytes) + } + + // Remove all transactions that were invalid during the creation of the partial proposal. + if err := blockbuster.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil { + return nil, fmt.Errorf("failed to remove txs from mempool for lane %s: %w", l.Name(), err) + } + + return txs, nil } -func (l *DefaultLane) ProcessLane(sdk.Context, [][]byte) error { - panic("implement me") +// ProcessLane verifies the default lane's portion of a block proposal. +func (l *DefaultLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next blockbuster.ProcessLanesHandler) (sdk.Context, error) { + for index, tx := range proposalTxs { + tx, err := l.cfg.TxDecoder(tx) + if err != nil { + return ctx, fmt.Errorf("failed to decode tx: %w", err) + } + + if l.Match(tx) { + if err := l.VerifyTx(ctx, tx); err != nil { + return ctx, fmt.Errorf("failed to verify tx: %w", err) + } + } else { + return next(ctx, proposalTxs[index:]) + } + } + + // This means we have processed all transactions in the proposal. + return ctx, nil } -func (l *DefaultLane) VerifyTx(sdk.Context, sdk.Tx) error { - panic("implement me") +// VerifyTx does basic verification of the transaction using the ante handler. +func (l *DefaultLane) VerifyTx(ctx sdk.Context, tx sdk.Tx) error { + if l.cfg.AnteHandler != nil { + _, err := l.cfg.AnteHandler(ctx, tx, false) + return err + } + + return nil } diff --git a/blockbuster/lanes/terminator/lane.go b/blockbuster/lanes/terminator/lane.go new file mode 100644 index 0000000..d3f4bd0 --- /dev/null +++ b/blockbuster/lanes/terminator/lane.go @@ -0,0 +1,84 @@ +package terminator + +import ( + "context" + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/skip-mev/pob/blockbuster" +) + +// Terminator Lane will get added to the chain to simplify chaining code so that we +// don't need to check if next == nil further up the chain +// +// sniped from the sdk +// +// ______ +// <((((((\\\ +// / . }\ +// ;--..--._|} +// (\ '--/\--' ) +// \\ | '-' :'| +// \\ . -==- .-| +// \\ \.__.' \--._ +// [\\ __.--| // _/'--. +// \ \\ .'-._ ('-----'/ __/ \ +// \ \\ / __>| | '--. | +// \ \\ | \ | / / / +// \ '\ / \ | | _/ / +// \ \ \ | | / / +// snd \ \ \ / +type Terminator struct{} + +var _ blockbuster.Lane = (*Terminator)(nil) + +// PrepareLane is a no-op +func (t Terminator) PrepareLane(_ sdk.Context, _ int64, _ map[string][]byte) ([][]byte, error) { + return nil, nil +} + +// ProcessLane is a no-op +func (t Terminator) ProcessLane(ctx sdk.Context, _ [][]byte, _ blockbuster.ProcessLanesHandler) (sdk.Context, error) { + return ctx, nil +} + +// Name returns the name of the lane +func (t Terminator) Name() string { + return "Terminator" +} + +// Match is a no-op +func (t Terminator) Match(sdk.Tx) bool { + return false +} + +// VerifyTx is a no-op +func (t Terminator) VerifyTx(sdk.Context, sdk.Tx) error { + return fmt.Errorf("Terminator lane should not be called") +} + +// Contains is a no-op +func (t Terminator) Contains(sdk.Tx) (bool, error) { + return false, nil +} + +// CountTx is a no-op +func (t Terminator) CountTx() int { + return 0 +} + +// Insert is a no-op +func (t Terminator) Insert(context.Context, sdk.Tx) error { + return nil +} + +// Remove is a no-op +func (t Terminator) Remove(sdk.Tx) error { + return nil +} + +// Select is a no-op +func (t Terminator) Select(context.Context, [][]byte) sdkmempool.Iterator { + return nil +} diff --git a/blockbuster/mempool.go b/blockbuster/mempool.go index b1d3324..efece15 100644 --- a/blockbuster/mempool.go +++ b/blockbuster/mempool.go @@ -8,23 +8,32 @@ import ( sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" ) -var _ sdkmempool.Mempool = (*Mempool)(nil) +var _ Mempool = (*BBMempool)(nil) -// Mempool defines the Blockbuster mempool implement. It contains a registry -// of lanes, which allows for customizable block proposal construction. -type Mempool struct { - registry []Lane -} +type ( + Mempool interface { + sdkmempool.Mempool -func NewMempool(lanes ...Lane) *Mempool { - return &Mempool{ + // Registry returns the mempool's lane registry. + Registry() []Lane + } + + // Mempool defines the Blockbuster mempool implement. It contains a registry + // of lanes, which allows for customizable block proposal construction. + BBMempool struct { + registry []Lane + } +) + +func NewMempool(lanes ...Lane) *BBMempool { + return &BBMempool{ registry: lanes, } } // TODO: Consider using a tx cache in Mempool and returning the length of that // cache instead of relying on lane count tracking. -func (m *Mempool) CountTx() int { +func (m *BBMempool) CountTx() int { var total int for _, lane := range m.registry { // TODO: If a global lane exists, we assume that lane has all transactions @@ -42,7 +51,7 @@ func (m *Mempool) CountTx() int { // Insert inserts a transaction into every lane that it matches. Insertion will // be attempted on all lanes, even if an error is encountered. -func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error { +func (m *BBMempool) Insert(ctx context.Context, tx sdk.Tx) error { errs := make([]error, 0, len(m.registry)) for _, lane := range m.registry { @@ -61,13 +70,13 @@ func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error { // - Determine if it even makes sense to return an iterator. What does that even // mean in the context where you have multiple lanes? // - Perhaps consider implementing and returning a no-op iterator? -func (m *Mempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator { +func (m *BBMempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator { return nil } // Remove removes a transaction from every lane that it matches. Removal will be // attempted on all lanes, even if an error is encountered. -func (m *Mempool) Remove(tx sdk.Tx) error { +func (m *BBMempool) Remove(tx sdk.Tx) error { errs := make([]error, 0, len(m.registry)) for _, lane := range m.registry { @@ -79,3 +88,8 @@ func (m *Mempool) Remove(tx sdk.Tx) error { return errors.Join(errs...) } + +// Registry returns the mempool's lane registry. +func (m *BBMempool) Registry() []Lane { + return m.registry +} diff --git a/blockbuster/utils.go b/blockbuster/utils.go index 0090c83..51d097c 100644 --- a/blockbuster/utils.go +++ b/blockbuster/utils.go @@ -6,6 +6,7 @@ import ( "fmt" sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" ) // GetTxHashStr returns the hex-encoded hash of the transaction. @@ -20,3 +21,14 @@ func GetTxHashStr(txEncoder sdk.TxEncoder, tx sdk.Tx) (string, error) { return txHashStr, nil } + +// RemoveTxsFromLane removes the transactions from the given lane's mempool. +func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) error { + for tx := range txs { + if err := mempool.Remove(tx); err != nil { + return err + } + } + + return nil +}