diff --git a/.golangci.yml b/.golangci.yml index 73be3d7..e6deaac 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,6 +29,7 @@ linters: - typecheck - unconvert - unused + # - errcheck issues: exclude-rules: diff --git a/abci/abci.go b/abci/abci.go index af0c564..89d0644 100644 --- a/abci/abci.go +++ b/abci/abci.go @@ -242,7 +242,6 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { } } } - } return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} diff --git a/abci/check_tx.go b/abci/check_tx.go index 7d96f2d..57f1d47 100644 --- a/abci/check_tx.go +++ b/abci/check_tx.go @@ -163,12 +163,8 @@ func (handler *CheckTxHandler) ValidateBidTx(ctx sdk.Context, bidTx sdk.Tx, bidI return gasInfo, fmt.Errorf("invalid bid tx; failed to decode bundled tx: %w", err) } - bidInfo, err := handler.mempool.GetAuctionBidInfo(bundledTx) - if err != nil { - return gasInfo, fmt.Errorf("invalid bid tx; failed to get auction bid info: %w", err) - } - - // Bid txs cannot be included in bundled txs. + // bid txs cannot be included in bundled txs + bidInfo, _ := handler.mempool.GetAuctionBidInfo(bundledTx) if bidInfo != nil { return gasInfo, fmt.Errorf("invalid bid tx; bundled tx cannot be a bid tx") } diff --git a/blockbuster/abci.go b/blockbuster/abci.go new file mode 100644 index 0000000..1da17ba --- /dev/null +++ b/blockbuster/abci.go @@ -0,0 +1,70 @@ +package blockbuster + +import ( + "crypto/sha256" + "encoding/hex" + + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +type ProposalHandler struct { + logger log.Logger + mempool Mempool +} + +func NewProposalHandler(logger log.Logger, mempool Mempool) *ProposalHandler { + return &ProposalHandler{ + logger: logger, + mempool: mempool, + } +} + +func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { + return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + var ( + selectedTxs = make(map[string][]byte) + totalTxBytes int64 + ) + + for _, l := range h.mempool.registry { + if totalTxBytes < req.MaxTxBytes { + laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs) + if err != nil { + h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err) + continue + } + + for _, txBz := range laneTxs { + totalTxBytes += int64(len(txBz)) + + txHash := sha256.Sum256(txBz) + txHashStr := hex.EncodeToString(txHash[:]) + + selectedTxs[txHashStr] = txBz + } + } + } + + proposalTxs := make([][]byte, 0, len(selectedTxs)) + for _, txBz := range selectedTxs { + proposalTxs = append(proposalTxs, txBz) + } + + return abci.ResponsePrepareProposal{Txs: proposalTxs} + } +} + +func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { + return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { + for _, l := range h.mempool.registry { + if err := l.ProcessLane(ctx, req.Txs); err != nil { + h.logger.Error("failed to process lane", "lane", l.Name(), "err", err) + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + } + + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} diff --git a/blockbuster/lane.go b/blockbuster/lane.go new file mode 100644 index 0000000..30fbb2d --- /dev/null +++ b/blockbuster/lane.go @@ -0,0 +1,42 @@ +package blockbuster + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/skip-mev/pob/mempool" +) + +type ( + // LaneConfig defines the configuration for a lane. + LaneConfig[C comparable] struct { + // XXX: For now we use the PriorityNonceMempoolConfig as the base config, + // which should be removed once Cosmos SDK v0.48 is released. + mempool.PriorityNonceMempoolConfig[C] + } + + // Lane defines an interface used for block construction + Lane interface { + sdkmempool.Mempool + + // Name returns the name of the lane. + Name() string + + // Match determines if a transaction belongs to this lane. + Match(tx sdk.Tx) bool + + // VerifyTx verifies the transaction belonging to this lane. + VerifyTx(ctx sdk.Context, tx sdk.Tx) error + + // Contains returns true if the mempool contains the given transaction. + Contains(tx sdk.Tx) (bool, error) + + // PrepareLane which builds a portion of the block. Inputs include the max + // number of bytes that can be included in the block and the selected transactions + // thus from from previous lane(s) as mapping from their HEX-encoded hash to + // the raw transaction. + PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) + + // ProcessLane which verifies the lane's portion of a proposed block. + ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error + } +) diff --git a/blockbuster/mempool.go b/blockbuster/mempool.go new file mode 100644 index 0000000..b1d3324 --- /dev/null +++ b/blockbuster/mempool.go @@ -0,0 +1,81 @@ +package blockbuster + +import ( + "context" + "errors" + + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" +) + +var _ sdkmempool.Mempool = (*Mempool)(nil) + +// Mempool defines the Blockbuster mempool implement. It contains a registry +// of lanes, which allows for customizable block proposal construction. +type Mempool struct { + registry []Lane +} + +func NewMempool(lanes ...Lane) *Mempool { + return &Mempool{ + registry: lanes, + } +} + +// TODO: Consider using a tx cache in Mempool and returning the length of that +// cache instead of relying on lane count tracking. +func (m *Mempool) CountTx() int { + var total int + for _, lane := range m.registry { + // TODO: If a global lane exists, we assume that lane has all transactions + // and we return the total. + // + // if lane.Name() == LaneNameGlobal { + // return lane.CountTx() + // } + + total += lane.CountTx() + } + + return total +} + +// Insert inserts a transaction into every lane that it matches. Insertion will +// be attempted on all lanes, even if an error is encountered. +func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error { + errs := make([]error, 0, len(m.registry)) + + for _, lane := range m.registry { + if lane.Match(tx) { + err := lane.Insert(ctx, tx) + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + +// Insert returns a nil iterator. +// +// TODO: +// - Determine if it even makes sense to return an iterator. What does that even +// mean in the context where you have multiple lanes? +// - Perhaps consider implementing and returning a no-op iterator? +func (m *Mempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator { + return nil +} + +// Remove removes a transaction from every lane that it matches. Removal will be +// attempted on all lanes, even if an error is encountered. +func (m *Mempool) Remove(tx sdk.Tx) error { + errs := make([]error, 0, len(m.registry)) + + for _, lane := range m.registry { + if lane.Match(tx) { + err := lane.Remove(tx) + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} diff --git a/blockbuster/tob_lane.go b/blockbuster/tob_lane.go new file mode 100644 index 0000000..e463ff2 --- /dev/null +++ b/blockbuster/tob_lane.go @@ -0,0 +1,347 @@ +package blockbuster + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + + "github.com/cometbft/cometbft/libs/log" + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/skip-mev/pob/mempool" +) + +const ( + // LaneNameTOB defines the name of the top-of-block auction lane. + LaneNameTOB = "tob" +) + +type ( + // AuctionBidInfo defines the information about a bid to the auction house. + AuctionBidInfo struct { + Bidder sdk.AccAddress + Bid sdk.Coin + Transactions [][]byte + Timeout uint64 + Signers []map[string]struct{} + } + + // AuctionFactory defines the interface for processing auction transactions. It is + // a wrapper around all of the functionality that each application chain must implement + // in order for auction processing to work. + AuctionFactory interface { + // WrapBundleTransaction defines a function that wraps a bundle transaction into a sdk.Tx. Since + // this is a potentially expensive operation, we allow each application chain to define how + // they want to wrap the transaction such that it is only called when necessary (i.e. when the + // transaction is being considered in the proposal handlers). + WrapBundleTransaction(tx []byte) (sdk.Tx, error) + + // GetAuctionBidInfo defines a function that returns the bid info from an auction transaction. + GetAuctionBidInfo(tx sdk.Tx) (*AuctionBidInfo, error) + } +) + +var _ Lane = (*TOBLane)(nil) + +type TOBLane struct { + logger log.Logger + index sdkmempool.Mempool + af AuctionFactory + txEncoder sdk.TxEncoder + txDecoder sdk.TxDecoder + anteHandler sdk.AnteHandler + + // txIndex is a map of all transactions in the mempool. It is used + // to quickly check if a transaction is already in the mempool. + txIndex map[string]struct{} +} + +func NewTOBLane(logger log.Logger, txDecoder sdk.TxDecoder, txEncoder sdk.TxEncoder, maxTx int, af AuctionFactory, anteHandler sdk.AnteHandler) *TOBLane { + return &TOBLane{ + logger: logger, + index: mempool.NewPriorityMempool( + mempool.PriorityNonceMempoolConfig[int64]{ + TxPriority: mempool.NewDefaultTxPriority(), + MaxTx: maxTx, + }, + ), + af: af, + txEncoder: txEncoder, + txDecoder: txDecoder, + anteHandler: anteHandler, + txIndex: make(map[string]struct{}), + } +} + +func (l *TOBLane) Name() string { + return LaneNameTOB +} + +func (l *TOBLane) Match(tx sdk.Tx) bool { + bidInfo, err := l.af.GetAuctionBidInfo(tx) + return bidInfo != nil && err == nil +} + +func (l *TOBLane) Contains(tx sdk.Tx) (bool, error) { + txHashStr, err := l.getTxHashStr(tx) + if err != nil { + return false, fmt.Errorf("failed to get tx hash string: %w", err) + } + + _, ok := l.txIndex[txHashStr] + return ok, nil +} + +func (l *TOBLane) VerifyTx(ctx sdk.Context, bidTx sdk.Tx) error { + bidInfo, err := l.af.GetAuctionBidInfo(bidTx) + if err != nil { + return fmt.Errorf("failed to get auction bid info: %w", err) + } + + // verify the top-level bid transaction + ctx, err = l.verifyTx(ctx, bidTx) + if err != nil { + return fmt.Errorf("invalid bid tx; failed to execute ante handler: %w", err) + } + + // verify all of the bundled transactions + for _, tx := range bidInfo.Transactions { + bundledTx, err := l.af.WrapBundleTransaction(tx) + if err != nil { + return fmt.Errorf("invalid bid tx; failed to decode bundled tx: %w", err) + } + + // bid txs cannot be included in bundled txs + bidInfo, _ := l.af.GetAuctionBidInfo(bundledTx) + if bidInfo != nil { + return fmt.Errorf("invalid bid tx; bundled tx cannot be a bid tx") + } + + if ctx, err = l.verifyTx(ctx, bundledTx); err != nil { + return fmt.Errorf("invalid bid tx; failed to execute bundled transaction: %w", err) + } + } + + return nil +} + +// PrepareLane which builds a portion of the block. Inputs a cache of transactions +// that have already been included by a previous lane. +func (l *TOBLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) { + var tmpSelectedTxs [][]byte + + bidTxIterator := l.index.Select(ctx, nil) + txsToRemove := make(map[sdk.Tx]struct{}, 0) + + // Attempt to select the highest bid transaction that is valid and whose + // bundled transactions are valid. +selectBidTxLoop: + for ; bidTxIterator != nil; bidTxIterator = bidTxIterator.Next() { + cacheCtx, write := ctx.CacheContext() + tmpBidTx := bidTxIterator.Tx() + + // if the transaction is already in the (partial) block proposal, we skip it + txHash, err := l.getTxHashStr(tmpBidTx) + if err != nil { + return nil, fmt.Errorf("failed to get bid tx hash: %w", err) + } + if _, ok := selectedTxs[txHash]; ok { + continue selectBidTxLoop + } + + bidTxBz, err := l.txEncoder(tmpBidTx) + if err != nil { + txsToRemove[tmpBidTx] = struct{}{} + continue selectBidTxLoop + } + + bidTxSize := int64(len(bidTxBz)) + if bidTxSize <= maxTxBytes { + if err := l.VerifyTx(cacheCtx, tmpBidTx); err != nil { + // Some transactions in the bundle may be malformed or invalid, so we + // remove the bid transaction and try the next top bid. + txsToRemove[tmpBidTx] = struct{}{} + continue selectBidTxLoop + } + + bidInfo, err := l.af.GetAuctionBidInfo(tmpBidTx) + if bidInfo == nil || err != nil { + // Some transactions in the bundle may be malformed or invalid, so we + // remove the bid transaction and try the next top bid. + txsToRemove[tmpBidTx] = struct{}{} + continue selectBidTxLoop + } + + // store the bytes of each ref tx as sdk.Tx bytes in order to build a valid proposal + bundledTxBz := make([][]byte, len(bidInfo.Transactions)) + for index, rawRefTx := range bidInfo.Transactions { + bundleTxBz := make([]byte, len(rawRefTx)) + copy(bundleTxBz, rawRefTx) + bundledTxBz[index] = rawRefTx + } + + // At this point, both the bid transaction itself and all the bundled + // transactions are valid. So we select the bid transaction along with + // all the bundled transactions. We also mark these transactions as seen and + // update the total size selected thus far. + tmpSelectedTxs = append(tmpSelectedTxs, bidTxBz) + tmpSelectedTxs = append(tmpSelectedTxs, bundledTxBz...) + + // Write the cache context to the original context when we know we have a + // valid top of block bundle. + write() + + break selectBidTxLoop + } + + txsToRemove[tmpBidTx] = struct{}{} + l.logger.Info( + "failed to select auction bid tx; tx size is too large", + "tx_size", bidTxSize, + "max_size", maxTxBytes, + ) + } + + // remove all invalid transactions from the mempool + for tx := range txsToRemove { + if err := l.Remove(tx); err != nil { + return nil, err + } + } + + return tmpSelectedTxs, nil +} + +// ProcessLane which verifies the lane's portion of a proposed block. +func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error { + for index, txBz := range proposalTxs { + tx, err := l.txDecoder(txBz) + if err != nil { + return err + } + + // skip transaction if it does not match this lane + if !l.Match(tx) { + continue + } + + _, err = l.processProposalVerifyTx(ctx, txBz) + if err != nil { + return err + } + + bidInfo, err := l.af.GetAuctionBidInfo(tx) + if err != nil { + return err + } + + // If the transaction is an auction bid, then we need to ensure that it is + // the first transaction in the block proposal and that the order of + // transactions in the block proposal follows the order of transactions in + // the bid. + if bidInfo != nil { + if index != 0 { + return errors.New("auction bid must be the first transaction in the block proposal") + } + + bundledTransactions := bidInfo.Transactions + if len(proposalTxs) < len(bundledTransactions)+1 { + return errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid") + } + + for i, refTxRaw := range bundledTransactions { + // Wrap and then encode the bundled transaction to ensure that the underlying + // reference transaction can be processed as an sdk.Tx. + wrappedTx, err := l.af.WrapBundleTransaction(refTxRaw) + if err != nil { + return err + } + + refTxBz, err := l.txEncoder(wrappedTx) + if err != nil { + return err + } + + if !bytes.Equal(refTxBz, proposalTxs[i+1]) { + return errors.New("block proposal does not match the bundled transactions in the auction bid") + } + } + } + } + + return nil +} + +func (l *TOBLane) Insert(goCtx context.Context, tx sdk.Tx) error { + txHashStr, err := l.getTxHashStr(tx) + if err != nil { + return err + } + + if err := l.index.Insert(goCtx, tx); err != nil { + return fmt.Errorf("failed to insert tx into auction index: %w", err) + } + + l.txIndex[txHashStr] = struct{}{} + return nil +} + +func (l *TOBLane) Select(goCtx context.Context, txs [][]byte) sdkmempool.Iterator { + return l.index.Select(goCtx, txs) +} + +func (l *TOBLane) CountTx() int { + return l.index.CountTx() +} + +func (l *TOBLane) Remove(tx sdk.Tx) error { + txHashStr, err := l.getTxHashStr(tx) + if err != nil { + return fmt.Errorf("failed to get tx hash string: %w", err) + } + + if err := l.index.Remove(tx); err != nil && !errors.Is(err, sdkmempool.ErrTxNotFound) { + return fmt.Errorf("failed to remove invalid transaction from the mempool: %w", err) + } + + delete(l.txIndex, txHashStr) + return nil +} + +func (l *TOBLane) processProposalVerifyTx(ctx sdk.Context, txBz []byte) (sdk.Tx, error) { + tx, err := l.txDecoder(txBz) + if err != nil { + return nil, err + } + + if _, err := l.verifyTx(ctx, tx); err != nil { + return nil, err + } + + return tx, nil +} + +func (l *TOBLane) verifyTx(ctx sdk.Context, tx sdk.Tx) (sdk.Context, error) { + if l.anteHandler != nil { + newCtx, err := l.anteHandler(ctx, tx, false) + return newCtx, err + } + + return ctx, nil +} + +// getTxHashStr returns the transaction hash string for a given transaction. +func (l *TOBLane) getTxHashStr(tx sdk.Tx) (string, error) { + txBz, err := l.txEncoder(tx) + if err != nil { + return "", fmt.Errorf("failed to encode transaction: %w", err) + } + + txHash := sha256.Sum256(txBz) + txHashStr := hex.EncodeToString(txHash[:]) + + return txHashStr, nil +} diff --git a/mempool/mempool.go b/mempool/mempool.go index fed8a7d..156f527 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -22,7 +22,7 @@ type ( // GetTopAuctionTx returns the top auction bid transaction in the mempool. GetTopAuctionTx(ctx context.Context) sdk.Tx - // CoutnAuctionTx returns the number of auction bid transactions in the mempool. + // CountAuctionTx returns the number of auction bid transactions in the mempool. CountAuctionTx() int // AuctionBidSelect returns an iterator over the auction bid transactions in the mempool.