feat: Blockbuster MVP + TOB Lane (#145)

This commit is contained in:
Aleksandr Bezobchuk 2023-05-30 16:03:48 -04:00 committed by GitHub
parent 63a98d756f
commit 957780d934
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 544 additions and 8 deletions

View File

@ -29,6 +29,7 @@ linters:
- typecheck
- unconvert
- unused
# - errcheck
issues:
exclude-rules:

View File

@ -242,7 +242,6 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
}
}
}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}

View File

@ -163,12 +163,8 @@ func (handler *CheckTxHandler) ValidateBidTx(ctx sdk.Context, bidTx sdk.Tx, bidI
return gasInfo, fmt.Errorf("invalid bid tx; failed to decode bundled tx: %w", err)
}
bidInfo, err := handler.mempool.GetAuctionBidInfo(bundledTx)
if err != nil {
return gasInfo, fmt.Errorf("invalid bid tx; failed to get auction bid info: %w", err)
}
// Bid txs cannot be included in bundled txs.
// bid txs cannot be included in bundled txs
bidInfo, _ := handler.mempool.GetAuctionBidInfo(bundledTx)
if bidInfo != nil {
return gasInfo, fmt.Errorf("invalid bid tx; bundled tx cannot be a bid tx")
}

70
blockbuster/abci.go Normal file
View File

@ -0,0 +1,70 @@
package blockbuster
import (
"crypto/sha256"
"encoding/hex"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
)
type ProposalHandler struct {
logger log.Logger
mempool Mempool
}
func NewProposalHandler(logger log.Logger, mempool Mempool) *ProposalHandler {
return &ProposalHandler{
logger: logger,
mempool: mempool,
}
}
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
var (
selectedTxs = make(map[string][]byte)
totalTxBytes int64
)
for _, l := range h.mempool.registry {
if totalTxBytes < req.MaxTxBytes {
laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs)
if err != nil {
h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err)
continue
}
for _, txBz := range laneTxs {
totalTxBytes += int64(len(txBz))
txHash := sha256.Sum256(txBz)
txHashStr := hex.EncodeToString(txHash[:])
selectedTxs[txHashStr] = txBz
}
}
}
proposalTxs := make([][]byte, 0, len(selectedTxs))
for _, txBz := range selectedTxs {
proposalTxs = append(proposalTxs, txBz)
}
return abci.ResponsePrepareProposal{Txs: proposalTxs}
}
}
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal {
for _, l := range h.mempool.registry {
if err := l.ProcessLane(ctx, req.Txs); err != nil {
h.logger.Error("failed to process lane", "lane", l.Name(), "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}

42
blockbuster/lane.go Normal file
View File

@ -0,0 +1,42 @@
package blockbuster
import (
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
"github.com/skip-mev/pob/mempool"
)
type (
// LaneConfig defines the configuration for a lane.
LaneConfig[C comparable] struct {
// XXX: For now we use the PriorityNonceMempoolConfig as the base config,
// which should be removed once Cosmos SDK v0.48 is released.
mempool.PriorityNonceMempoolConfig[C]
}
// Lane defines an interface used for block construction
Lane interface {
sdkmempool.Mempool
// Name returns the name of the lane.
Name() string
// Match determines if a transaction belongs to this lane.
Match(tx sdk.Tx) bool
// VerifyTx verifies the transaction belonging to this lane.
VerifyTx(ctx sdk.Context, tx sdk.Tx) error
// Contains returns true if the mempool contains the given transaction.
Contains(tx sdk.Tx) (bool, error)
// PrepareLane which builds a portion of the block. Inputs include the max
// number of bytes that can be included in the block and the selected transactions
// thus from from previous lane(s) as mapping from their HEX-encoded hash to
// the raw transaction.
PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error)
// ProcessLane which verifies the lane's portion of a proposed block.
ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error
}
)

81
blockbuster/mempool.go Normal file
View File

@ -0,0 +1,81 @@
package blockbuster
import (
"context"
"errors"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
var _ sdkmempool.Mempool = (*Mempool)(nil)
// Mempool defines the Blockbuster mempool implement. It contains a registry
// of lanes, which allows for customizable block proposal construction.
type Mempool struct {
registry []Lane
}
func NewMempool(lanes ...Lane) *Mempool {
return &Mempool{
registry: lanes,
}
}
// TODO: Consider using a tx cache in Mempool and returning the length of that
// cache instead of relying on lane count tracking.
func (m *Mempool) CountTx() int {
var total int
for _, lane := range m.registry {
// TODO: If a global lane exists, we assume that lane has all transactions
// and we return the total.
//
// if lane.Name() == LaneNameGlobal {
// return lane.CountTx()
// }
total += lane.CountTx()
}
return total
}
// Insert inserts a transaction into every lane that it matches. Insertion will
// be attempted on all lanes, even if an error is encountered.
func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
if lane.Match(tx) {
err := lane.Insert(ctx, tx)
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// Insert returns a nil iterator.
//
// TODO:
// - Determine if it even makes sense to return an iterator. What does that even
// mean in the context where you have multiple lanes?
// - Perhaps consider implementing and returning a no-op iterator?
func (m *Mempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator {
return nil
}
// Remove removes a transaction from every lane that it matches. Removal will be
// attempted on all lanes, even if an error is encountered.
func (m *Mempool) Remove(tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
if lane.Match(tx) {
err := lane.Remove(tx)
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

347
blockbuster/tob_lane.go Normal file
View File

@ -0,0 +1,347 @@
package blockbuster
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
"github.com/skip-mev/pob/mempool"
)
const (
// LaneNameTOB defines the name of the top-of-block auction lane.
LaneNameTOB = "tob"
)
type (
// AuctionBidInfo defines the information about a bid to the auction house.
AuctionBidInfo struct {
Bidder sdk.AccAddress
Bid sdk.Coin
Transactions [][]byte
Timeout uint64
Signers []map[string]struct{}
}
// AuctionFactory defines the interface for processing auction transactions. It is
// a wrapper around all of the functionality that each application chain must implement
// in order for auction processing to work.
AuctionFactory interface {
// WrapBundleTransaction defines a function that wraps a bundle transaction into a sdk.Tx. Since
// this is a potentially expensive operation, we allow each application chain to define how
// they want to wrap the transaction such that it is only called when necessary (i.e. when the
// transaction is being considered in the proposal handlers).
WrapBundleTransaction(tx []byte) (sdk.Tx, error)
// GetAuctionBidInfo defines a function that returns the bid info from an auction transaction.
GetAuctionBidInfo(tx sdk.Tx) (*AuctionBidInfo, error)
}
)
var _ Lane = (*TOBLane)(nil)
type TOBLane struct {
logger log.Logger
index sdkmempool.Mempool
af AuctionFactory
txEncoder sdk.TxEncoder
txDecoder sdk.TxDecoder
anteHandler sdk.AnteHandler
// txIndex is a map of all transactions in the mempool. It is used
// to quickly check if a transaction is already in the mempool.
txIndex map[string]struct{}
}
func NewTOBLane(logger log.Logger, txDecoder sdk.TxDecoder, txEncoder sdk.TxEncoder, maxTx int, af AuctionFactory, anteHandler sdk.AnteHandler) *TOBLane {
return &TOBLane{
logger: logger,
index: mempool.NewPriorityMempool(
mempool.PriorityNonceMempoolConfig[int64]{
TxPriority: mempool.NewDefaultTxPriority(),
MaxTx: maxTx,
},
),
af: af,
txEncoder: txEncoder,
txDecoder: txDecoder,
anteHandler: anteHandler,
txIndex: make(map[string]struct{}),
}
}
func (l *TOBLane) Name() string {
return LaneNameTOB
}
func (l *TOBLane) Match(tx sdk.Tx) bool {
bidInfo, err := l.af.GetAuctionBidInfo(tx)
return bidInfo != nil && err == nil
}
func (l *TOBLane) Contains(tx sdk.Tx) (bool, error) {
txHashStr, err := l.getTxHashStr(tx)
if err != nil {
return false, fmt.Errorf("failed to get tx hash string: %w", err)
}
_, ok := l.txIndex[txHashStr]
return ok, nil
}
func (l *TOBLane) VerifyTx(ctx sdk.Context, bidTx sdk.Tx) error {
bidInfo, err := l.af.GetAuctionBidInfo(bidTx)
if err != nil {
return fmt.Errorf("failed to get auction bid info: %w", err)
}
// verify the top-level bid transaction
ctx, err = l.verifyTx(ctx, bidTx)
if err != nil {
return fmt.Errorf("invalid bid tx; failed to execute ante handler: %w", err)
}
// verify all of the bundled transactions
for _, tx := range bidInfo.Transactions {
bundledTx, err := l.af.WrapBundleTransaction(tx)
if err != nil {
return fmt.Errorf("invalid bid tx; failed to decode bundled tx: %w", err)
}
// bid txs cannot be included in bundled txs
bidInfo, _ := l.af.GetAuctionBidInfo(bundledTx)
if bidInfo != nil {
return fmt.Errorf("invalid bid tx; bundled tx cannot be a bid tx")
}
if ctx, err = l.verifyTx(ctx, bundledTx); err != nil {
return fmt.Errorf("invalid bid tx; failed to execute bundled transaction: %w", err)
}
}
return nil
}
// PrepareLane which builds a portion of the block. Inputs a cache of transactions
// that have already been included by a previous lane.
func (l *TOBLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) {
var tmpSelectedTxs [][]byte
bidTxIterator := l.index.Select(ctx, nil)
txsToRemove := make(map[sdk.Tx]struct{}, 0)
// Attempt to select the highest bid transaction that is valid and whose
// bundled transactions are valid.
selectBidTxLoop:
for ; bidTxIterator != nil; bidTxIterator = bidTxIterator.Next() {
cacheCtx, write := ctx.CacheContext()
tmpBidTx := bidTxIterator.Tx()
// if the transaction is already in the (partial) block proposal, we skip it
txHash, err := l.getTxHashStr(tmpBidTx)
if err != nil {
return nil, fmt.Errorf("failed to get bid tx hash: %w", err)
}
if _, ok := selectedTxs[txHash]; ok {
continue selectBidTxLoop
}
bidTxBz, err := l.txEncoder(tmpBidTx)
if err != nil {
txsToRemove[tmpBidTx] = struct{}{}
continue selectBidTxLoop
}
bidTxSize := int64(len(bidTxBz))
if bidTxSize <= maxTxBytes {
if err := l.VerifyTx(cacheCtx, tmpBidTx); err != nil {
// Some transactions in the bundle may be malformed or invalid, so we
// remove the bid transaction and try the next top bid.
txsToRemove[tmpBidTx] = struct{}{}
continue selectBidTxLoop
}
bidInfo, err := l.af.GetAuctionBidInfo(tmpBidTx)
if bidInfo == nil || err != nil {
// Some transactions in the bundle may be malformed or invalid, so we
// remove the bid transaction and try the next top bid.
txsToRemove[tmpBidTx] = struct{}{}
continue selectBidTxLoop
}
// store the bytes of each ref tx as sdk.Tx bytes in order to build a valid proposal
bundledTxBz := make([][]byte, len(bidInfo.Transactions))
for index, rawRefTx := range bidInfo.Transactions {
bundleTxBz := make([]byte, len(rawRefTx))
copy(bundleTxBz, rawRefTx)
bundledTxBz[index] = rawRefTx
}
// At this point, both the bid transaction itself and all the bundled
// transactions are valid. So we select the bid transaction along with
// all the bundled transactions. We also mark these transactions as seen and
// update the total size selected thus far.
tmpSelectedTxs = append(tmpSelectedTxs, bidTxBz)
tmpSelectedTxs = append(tmpSelectedTxs, bundledTxBz...)
// Write the cache context to the original context when we know we have a
// valid top of block bundle.
write()
break selectBidTxLoop
}
txsToRemove[tmpBidTx] = struct{}{}
l.logger.Info(
"failed to select auction bid tx; tx size is too large",
"tx_size", bidTxSize,
"max_size", maxTxBytes,
)
}
// remove all invalid transactions from the mempool
for tx := range txsToRemove {
if err := l.Remove(tx); err != nil {
return nil, err
}
}
return tmpSelectedTxs, nil
}
// ProcessLane which verifies the lane's portion of a proposed block.
func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error {
for index, txBz := range proposalTxs {
tx, err := l.txDecoder(txBz)
if err != nil {
return err
}
// skip transaction if it does not match this lane
if !l.Match(tx) {
continue
}
_, err = l.processProposalVerifyTx(ctx, txBz)
if err != nil {
return err
}
bidInfo, err := l.af.GetAuctionBidInfo(tx)
if err != nil {
return err
}
// If the transaction is an auction bid, then we need to ensure that it is
// the first transaction in the block proposal and that the order of
// transactions in the block proposal follows the order of transactions in
// the bid.
if bidInfo != nil {
if index != 0 {
return errors.New("auction bid must be the first transaction in the block proposal")
}
bundledTransactions := bidInfo.Transactions
if len(proposalTxs) < len(bundledTransactions)+1 {
return errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid")
}
for i, refTxRaw := range bundledTransactions {
// Wrap and then encode the bundled transaction to ensure that the underlying
// reference transaction can be processed as an sdk.Tx.
wrappedTx, err := l.af.WrapBundleTransaction(refTxRaw)
if err != nil {
return err
}
refTxBz, err := l.txEncoder(wrappedTx)
if err != nil {
return err
}
if !bytes.Equal(refTxBz, proposalTxs[i+1]) {
return errors.New("block proposal does not match the bundled transactions in the auction bid")
}
}
}
}
return nil
}
func (l *TOBLane) Insert(goCtx context.Context, tx sdk.Tx) error {
txHashStr, err := l.getTxHashStr(tx)
if err != nil {
return err
}
if err := l.index.Insert(goCtx, tx); err != nil {
return fmt.Errorf("failed to insert tx into auction index: %w", err)
}
l.txIndex[txHashStr] = struct{}{}
return nil
}
func (l *TOBLane) Select(goCtx context.Context, txs [][]byte) sdkmempool.Iterator {
return l.index.Select(goCtx, txs)
}
func (l *TOBLane) CountTx() int {
return l.index.CountTx()
}
func (l *TOBLane) Remove(tx sdk.Tx) error {
txHashStr, err := l.getTxHashStr(tx)
if err != nil {
return fmt.Errorf("failed to get tx hash string: %w", err)
}
if err := l.index.Remove(tx); err != nil && !errors.Is(err, sdkmempool.ErrTxNotFound) {
return fmt.Errorf("failed to remove invalid transaction from the mempool: %w", err)
}
delete(l.txIndex, txHashStr)
return nil
}
func (l *TOBLane) processProposalVerifyTx(ctx sdk.Context, txBz []byte) (sdk.Tx, error) {
tx, err := l.txDecoder(txBz)
if err != nil {
return nil, err
}
if _, err := l.verifyTx(ctx, tx); err != nil {
return nil, err
}
return tx, nil
}
func (l *TOBLane) verifyTx(ctx sdk.Context, tx sdk.Tx) (sdk.Context, error) {
if l.anteHandler != nil {
newCtx, err := l.anteHandler(ctx, tx, false)
return newCtx, err
}
return ctx, nil
}
// getTxHashStr returns the transaction hash string for a given transaction.
func (l *TOBLane) getTxHashStr(tx sdk.Tx) (string, error) {
txBz, err := l.txEncoder(tx)
if err != nil {
return "", fmt.Errorf("failed to encode transaction: %w", err)
}
txHash := sha256.Sum256(txBz)
txHashStr := hex.EncodeToString(txHash[:])
return txHashStr, nil
}

View File

@ -22,7 +22,7 @@ type (
// GetTopAuctionTx returns the top auction bid transaction in the mempool.
GetTopAuctionTx(ctx context.Context) sdk.Tx
// CoutnAuctionTx returns the number of auction bid transactions in the mempool.
// CountAuctionTx returns the number of auction bid transactions in the mempool.
CountAuctionTx() int
// AuctionBidSelect returns an iterator over the auction bid transactions in the mempool.