From 427194f70c698a4a1ef1cad3a3f1fec7b8bd9597 Mon Sep 17 00:00:00 2001 From: David Terpay Date: Mon, 14 Aug 2023 17:38:20 -0400 Subject: [PATCH] init --- blockbuster/abci.go | 67 +++++++ blockbuster/handlers.go | 199 +++++++++++++++++++ blockbuster/lane.go | 123 ------------ blockbuster/lane_constructor.go | 196 ++++++++++++++++++ blockbuster/lane_interface.go | 71 +++++++ blockbuster/lane_mempool.go | 116 +++++++++++ blockbuster/lanes/terminator/lane.go | 52 ++--- blockbuster/priority_nonce.go | 16 +- blockbuster/proposals.go | 23 ++- blockbuster/{abci => proposals}/abci.go | 42 ++-- blockbuster/{abci => proposals}/abci_test.go | 2 +- blockbuster/{abci => proposals}/check_tx.go | 2 +- blockbuster/types.go | 164 +++++++++++++++ blockbuster/utils/utils.go | 4 +- 14 files changed, 892 insertions(+), 185 deletions(-) create mode 100644 blockbuster/abci.go create mode 100644 blockbuster/handlers.go delete mode 100644 blockbuster/lane.go create mode 100644 blockbuster/lane_constructor.go create mode 100644 blockbuster/lane_interface.go create mode 100644 blockbuster/lane_mempool.go rename blockbuster/{abci => proposals}/abci.go (84%) rename blockbuster/{abci => proposals}/abci_test.go (99%) rename blockbuster/{abci => proposals}/check_tx.go (99%) create mode 100644 blockbuster/types.go diff --git a/blockbuster/abci.go b/blockbuster/abci.go new file mode 100644 index 0000000..70dd0d0 --- /dev/null +++ b/blockbuster/abci.go @@ -0,0 +1,67 @@ +package blockbuster + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/skip-mev/pob/blockbuster/utils" +) + +// PrepareLane will prepare a partial proposal for the lane. It will select transactions from the +// lane respecting the selection logic of the prepareLaneHandler. It will then update the partial +// proposal with the selected transactions. If the proposal is unable to be updated, we return an +// error. The proposal will only be modified if it passes all of the invarient checks. +func (l *LaneConstructor[C]) PrepareLane( + ctx sdk.Context, + proposal BlockProposal, + maxTxBytes int64, + next PrepareLanesHandler, +) (BlockProposal, error) { + txs, txsToRemove, err := l.prepareLaneHandler(ctx, proposal, maxTxBytes) + if err != nil { + return proposal, err + } + + // Remove all transactions that were invalid during the creation of the partial proposal. + if err := utils.RemoveTxsFromLane(txsToRemove, l); err != nil { + l.Logger().Error( + "failed to remove transactions from lane", + "lane", l.Name(), + "err", err, + ) + } + + // Update the proposal with the selected transactions. + if err := proposal.UpdateProposal(l, txs); err != nil { + return proposal, err + } + + return next(ctx, proposal) +} + +// CheckOrder checks that the ordering logic of the lane is respected given the set of transactions +// in the block proposal. If the ordering logic is not respected, we return an error. +func (l *LaneConstructor[C]) CheckOrder(ctx sdk.Context, txs []sdk.Tx) error { + return l.checkOrderHandler(ctx, txs) +} + +// ProcessLane verifies that the transactions included in the block proposal are valid respecting +// the verification logic of the lane (processLaneHandler). If the transactions are valid, we +// return the transactions that do not belong to this lane to the next lane. If the transactions +// are invalid, we return an error. +func (l *LaneConstructor[C]) ProcessLane(ctx sdk.Context, txs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error) { + remainingTxs, err := l.processLaneHandler(ctx, txs) + if err != nil { + return ctx, err + } + + return next(ctx, remainingTxs) +} + +// AnteVerifyTx verifies that the transaction is valid respecting the ante verification logic of +// of the antehandler chain. +func (l *LaneConstructor[C]) AnteVerifyTx(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) { + if l.cfg.AnteHandler != nil { + return l.cfg.AnteHandler(ctx, tx, simulate) + } + + return ctx, nil +} diff --git a/blockbuster/handlers.go b/blockbuster/handlers.go new file mode 100644 index 0000000..52fb5d7 --- /dev/null +++ b/blockbuster/handlers.go @@ -0,0 +1,199 @@ +package blockbuster + +import ( + "context" + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/skip-mev/pob/blockbuster/utils" +) + +// DefaultPrepareLaneHandler returns a default implementation of the PrepareLaneHandler. It +// selects all transactions in the mempool that are valid and not already in the partial +// proposal. It will continue to reap transactions until the maximum block space for this +// lane has been reached. Additionally, any transactions that are invalid will be returned. +func (l *LaneConstructor[C]) DefaultPrepareLaneHandler() PrepareLaneHandler { + return func(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64) ([][]byte, []sdk.Tx, error) { + var ( + totalSize int64 + txs [][]byte + txsToRemove []sdk.Tx + ) + + // Select all transactions in the mempool that are valid and not already in the + // partial proposal. + for iterator := l.Select(ctx, nil); iterator != nil; iterator = iterator.Next() { + tx := iterator.Tx() + + txBytes, hash, err := utils.GetTxHashStr(l.TxEncoder(), tx) + if err != nil { + l.Logger().Info("failed to get hash of tx", "err", err) + + txsToRemove = append(txsToRemove, tx) + continue + } + + // Double check that the transaction belongs to this lane. + if !l.Match(ctx, tx) { + l.Logger().Info( + "failed to select tx for lane; tx does not belong to lane", + "tx_hash", hash, + "lane", l.Name(), + ) + + txsToRemove = append(txsToRemove, tx) + continue + } + + // if the transaction is already in the (partial) block proposal, we skip it. + if proposal.Contains(txBytes) { + l.Logger().Info( + "failed to select tx for lane; tx is already in proposal", + "tx_hash", hash, + "lane", l.Name(), + ) + + continue + } + + // If the transaction is too large, we break and do not attempt to include more txs. + txSize := int64(len(txBytes)) + if updatedSize := totalSize + txSize; updatedSize > maxTxBytes { + l.Logger().Info( + "tx bytes above the maximum allowed", + "lane", l.Name(), + "tx_size", txSize, + "total_size", totalSize, + "max_tx_bytes", maxTxBytes, + "tx_hash", hash, + ) + + break + } + + // Verify the transaction. + if ctx, err = l.AnteVerifyTx(ctx, tx, false); err != nil { + l.Logger().Info( + "failed to verify tx", + "tx_hash", hash, + "err", err, + ) + + txsToRemove = append(txsToRemove, tx) + continue + } + + totalSize += txSize + txs = append(txs, txBytes) + } + + return txs, txsToRemove, nil + } +} + +// DefaultProcessLaneHandler returns a default implementation of the ProcessLaneHandler. It +// verifies all transactions in the lane that matches to the lane. If any transaction +// fails to verify, the entire proposal is rejected. If the handler comes across a transaction +// that does not match the lane's matcher, it will return the remaining transactions in the +// proposal. +func (l *LaneConstructor[C]) DefaultProcessLaneHandler() ProcessLaneHandler { + return func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error) { + var err error + + // Process all transactions that match the lane's matcher. + for index, tx := range txs { + if l.Match(ctx, tx) { + if ctx, err = l.AnteVerifyTx(ctx, tx, false); err != nil { + return nil, fmt.Errorf("failed to verify tx: %w", err) + } + } else { + return txs[index:], nil + } + } + + // This means we have processed all transactions in the proposal. + return nil, nil + } +} + +// DefaultCheckOrderHandler returns a default implementation of the CheckOrderHandler. It +// ensures the following invariants: +// +// 1. All transactions that belong to this lane respect the ordering logic defined by the +// lane. +// 2. Transactions that belong to other lanes cannot be interleaved with transactions that +// belong to this lane. +func (l *LaneConstructor[C]) DefaultCheckOrderHandler() CheckOrderHandler { + return func(ctx sdk.Context, txs []sdk.Tx) error { + seenOtherLaneTx := false + + for index, tx := range txs { + if l.Match(ctx, tx) { + if seenOtherLaneTx { + return fmt.Errorf("the %s lane contains a transaction that belongs to another lane", l.Name()) + } + + // If the transactions do not respect the priority defined by the mempool, we consider the proposal + // to be invalid + if index > 0 && l.Compare(ctx, txs[index-1], tx) == -1 { + return fmt.Errorf("transaction at index %d has a higher priority than %d", index, index-1) + } + } else { + seenOtherLaneTx = true + } + } + + return nil + } +} + +// DefaultMatchHandler returns a default implementation of the MatchHandler. It matches all +// transactions. +func DefaultMatchHandler() MatchHandler { + return func(ctx sdk.Context, tx sdk.Tx) bool { + return true + } +} + +// DefaultTxPriority returns a default implementation of the TxPriority. It prioritizes +// transactions by their fee. +func DefaultTxPriority() TxPriority[string] { + return TxPriority[string]{ + GetTxPriority: func(goCtx context.Context, tx sdk.Tx) string { + feeTx, ok := tx.(sdk.FeeTx) + if !ok { + return "" + } + + return feeTx.GetFee().String() + }, + Compare: func(a, b string) int { + aCoins, _ := sdk.ParseCoinsNormalized(a) + bCoins, _ := sdk.ParseCoinsNormalized(b) + + switch { + case aCoins == nil && bCoins == nil: + return 0 + + case aCoins == nil: + return -1 + + case bCoins == nil: + return 1 + + default: + switch { + case aCoins.IsAllGT(bCoins): + return 1 + + case aCoins.IsAllLT(bCoins): + return -1 + + default: + return 0 + } + } + }, + MinValue: "", + } +} diff --git a/blockbuster/lane.go b/blockbuster/lane.go deleted file mode 100644 index c0d9bcc..0000000 --- a/blockbuster/lane.go +++ /dev/null @@ -1,123 +0,0 @@ -package blockbuster - -import ( - "fmt" - - "cosmossdk.io/log" - "cosmossdk.io/math" - sdk "github.com/cosmos/cosmos-sdk/types" - sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" -) - -type ( - // PrepareLanesHandler wraps all of the lanes Prepare function into a single chained - // function. You can think of it like an AnteHandler, but for preparing proposals in the - // context of lanes instead of modules. - PrepareLanesHandler func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error) - - // ProcessLanesHandler wraps all of the lanes Process functions into a single chained - // function. You can think of it like an AnteHandler, but for processing proposals in the - // context of lanes instead of modules. - ProcessLanesHandler func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error) - - // BaseLaneConfig defines the basic functionality needed for a lane. - BaseLaneConfig struct { - Logger log.Logger - TxEncoder sdk.TxEncoder - TxDecoder sdk.TxDecoder - AnteHandler sdk.AnteHandler - - // MaxBlockSpace defines the relative percentage of block space that can be - // used by this lane. NOTE: If this is set to zero, then there is no limit - // on the number of transactions that can be included in the block for this - // lane (up to maxTxBytes as provided by the request). This is useful for the default lane. - MaxBlockSpace math.LegacyDec - - // IgnoreList defines the list of lanes to ignore when processing transactions. This - // is useful for when you want lanes to exist after the default lane. For example, - // say there are two lanes: default and free. The free lane should be processed after - // the default lane. In this case, the free lane should be added to the ignore list - // of the default lane. Otherwise, the transactions that belong to the free lane - // will be processed by the default lane. - IgnoreList []Lane - } - - // Lane defines an interface used for block construction - Lane interface { - sdkmempool.Mempool - - // Name returns the name of the lane. - Name() string - - // Match determines if a transaction belongs to this lane. - Match(ctx sdk.Context, tx sdk.Tx) bool - - // VerifyTx verifies the transaction belonging to this lane. - VerifyTx(ctx sdk.Context, tx sdk.Tx) error - - // Contains returns true if the mempool/lane contains the given transaction. - Contains(tx sdk.Tx) bool - - // PrepareLane builds a portion of the block. It inputs the maxTxBytes that can be - // included in the proposal for the given lane, the partial proposal, and a function - // to call the next lane in the chain. The next lane in the chain will be called with - // the updated proposal and context. - PrepareLane(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64, next PrepareLanesHandler) (BlockProposal, error) - - // ProcessLaneBasic validates that transactions belonging to this lane are not misplaced - // in the block proposal. - ProcessLaneBasic(ctx sdk.Context, txs []sdk.Tx) error - - // ProcessLane verifies this lane's portion of a proposed block. It inputs the transactions - // that may belong to this lane and a function to call the next lane in the chain. The next - // lane in the chain will be called with the updated context and filtered down transactions. - ProcessLane(ctx sdk.Context, proposalTxs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error) - - // SetAnteHandler sets the lane's antehandler. - SetAnteHandler(antehander sdk.AnteHandler) - - // Logger returns the lane's logger. - Logger() log.Logger - - // GetMaxBlockSpace returns the max block space for the lane as a relative percentage. - GetMaxBlockSpace() math.LegacyDec - } -) - -// NewLaneConfig returns a new LaneConfig. This will be embedded in a lane. -func NewBaseLaneConfig( - logger log.Logger, - txEncoder sdk.TxEncoder, - txDecoder sdk.TxDecoder, - anteHandler sdk.AnteHandler, - maxBlockSpace math.LegacyDec, -) BaseLaneConfig { - return BaseLaneConfig{ - Logger: logger, - TxEncoder: txEncoder, - TxDecoder: txDecoder, - AnteHandler: anteHandler, - MaxBlockSpace: maxBlockSpace, - } -} - -// ValidateBasic validates the lane configuration. -func (c *BaseLaneConfig) ValidateBasic() error { - if c.Logger == nil { - return fmt.Errorf("logger cannot be nil") - } - - if c.TxEncoder == nil { - return fmt.Errorf("tx encoder cannot be nil") - } - - if c.TxDecoder == nil { - return fmt.Errorf("tx decoder cannot be nil") - } - - if c.MaxBlockSpace.IsNil() || c.MaxBlockSpace.IsNegative() || c.MaxBlockSpace.GT(math.LegacyOneDec()) { - return fmt.Errorf("max block space must be set to a value between 0 and 1") - } - - return nil -} diff --git a/blockbuster/lane_constructor.go b/blockbuster/lane_constructor.go new file mode 100644 index 0000000..c139978 --- /dev/null +++ b/blockbuster/lane_constructor.go @@ -0,0 +1,196 @@ +package blockbuster + +import ( + "fmt" + + "cosmossdk.io/log" + "cosmossdk.io/math" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +// LaneConstructor is a generic implementation of a lane. It is meant to be used +// as a base for other lanes to be built on top of. It provides a default +// implementation of the MatchHandler, PrepareLaneHandler, ProcessLaneHandler, +// and CheckOrderHandler. To extend this lane, you must either utilize the default +// handlers or construct your own that you pass into the constructor/setters. +type LaneConstructor[C comparable] struct { + // cfg stores functionality requred to encode/decode transactions, maintains how + // many transactions are allowed in this lane's mempool, and the amount of block + // space this lane is allowed to consume. + cfg LaneConfig + + // laneName is the name of the lane. + laneName string + + // LaneMempool is the mempool that is responsible for storing transactions + // that are waiting to be processed. + LaneMempool + + // matchHandler is the function that determines whether or not a transaction + // should be processed by this lane. + matchHandler MatchHandler + + // prepareLaneHandler is the function that is called when a new proposal is being + // requested and the lane needs to submit transactions it wants included in the block. + prepareLaneHandler PrepareLaneHandler + + // checkOrderHandler is the function that is called when a new proposal is being + // verified and the lane needs to verify that the transactions included in the proposal + // respect the ordering rules of the lane and does not interleave transactions from other lanes. + checkOrderHandler CheckOrderHandler + + // processLaneHandler is the function that is called when a new proposal is being + // verified and the lane needs to verify that the transactions included in the proposal + // are valid respecting the verification logic of the lane. + processLaneHandler ProcessLaneHandler +} + +// NewLaneConstructor returns a new lane constructor. When creating this lane, the type +// of the lane must be specified. The type of the lane is directly associated with the +// type of the mempool that is used to store transactions that are waiting to be processed. +func NewLaneConstructor[C comparable]( + cfg LaneConfig, + laneName string, + laneMempool LaneMempool, + matchHandlerFn MatchHandler, +) *LaneConstructor[C] { + lane := &LaneConstructor[C]{ + cfg: cfg, + laneName: laneName, + LaneMempool: laneMempool, + matchHandler: matchHandlerFn, + } + + if err := lane.ValidateBasic(); err != nil { + panic(err) + } + + return lane +} + +// ValidateBasic ensures that the lane was constructed properly. In the case that +// the lane was not constructed with proper handlers, default handlers are set. +func (l *LaneConstructor[C]) ValidateBasic() error { + if err := l.cfg.ValidateBasic(); err != nil { + return err + } + + if l.laneName == "" { + return fmt.Errorf("lane name cannot be empty") + } + + if l.LaneMempool == nil { + return fmt.Errorf("lane mempool cannot be nil") + } + + if l.matchHandler == nil { + return fmt.Errorf("match handler cannot be nil") + } + + if l.prepareLaneHandler == nil { + l.prepareLaneHandler = l.DefaultPrepareLaneHandler() + } + + if l.processLaneHandler == nil { + l.processLaneHandler = l.DefaultProcessLaneHandler() + } + + if l.checkOrderHandler == nil { + l.checkOrderHandler = l.DefaultCheckOrderHandler() + } + + return nil +} + +// SetPrepareLaneHandler sets the prepare lane handler for the lane. This handler +// is called when a new proposal is being requested and the lane needs to submit +// transactions it wants included in the block. +func (l *LaneConstructor[C]) SetPrepareLaneHandler(prepareLaneHandler PrepareLaneHandler) { + if prepareLaneHandler == nil { + panic("prepare lane handler cannot be nil") + } + + l.prepareLaneHandler = prepareLaneHandler +} + +// SetProcessLaneHandler sets the process lane handler for the lane. This handler +// is called when a new proposal is being verified and the lane needs to verify +// that the transactions included in the proposal are valid respecting the verification +// logic of the lane. +func (l *LaneConstructor[C]) SetProcessLaneHandler(processLaneHandler ProcessLaneHandler) { + if processLaneHandler == nil { + panic("process lane handler cannot be nil") + } + + l.processLaneHandler = processLaneHandler +} + +// SetCheckOrderHandler sets the check order handler for the lane. This handler +// is called when a new proposal is being verified and the lane needs to verify +// that the transactions included in the proposal respect the ordering rules of +// the lane and does not include transactions from other lanes. +func (l *LaneConstructor[C]) SetCheckOrderHandler(checkOrderHandler CheckOrderHandler) { + if checkOrderHandler == nil { + panic("check order handler cannot be nil") + } + + l.checkOrderHandler = checkOrderHandler +} + +// Match returns true if the transaction should be processed by this lane. This +// function first determines if the transaction matches the lane and then checks +// if the transaction is on the ignore list. If the transaction is on the ignore +// list, it returns false. +func (l *LaneConstructor[C]) Match(ctx sdk.Context, tx sdk.Tx) bool { + return l.matchHandler(ctx, tx) && !l.CheckIgnoreList(ctx, tx) +} + +// CheckIgnoreList returns true if the transaction is on the ignore list. The ignore +// list is utilized to prevent transactions that should be considered in other lanes +// from being considered from this lane. +func (l *LaneConstructor[C]) CheckIgnoreList(ctx sdk.Context, tx sdk.Tx) bool { + for _, lane := range l.cfg.IgnoreList { + if lane.Match(ctx, tx) { + return true + } + } + + return false +} + +// Name returns the name of the lane. +func (l *LaneConstructor[C]) Name() string { + return l.laneName +} + +// SetIgnoreList sets the ignore list for the lane. The ignore list is a list +// of lanes that the lane should ignore when processing transactions. +func (l *LaneConstructor[C]) SetIgnoreList(lanes []Lane) { + l.cfg.IgnoreList = lanes +} + +// SetAnteHandler sets the ante handler for the lane. +func (l *LaneConstructor[C]) SetAnteHandler(anteHandler sdk.AnteHandler) { + l.cfg.AnteHandler = anteHandler +} + +// Logger returns the logger for the lane. +func (l *LaneConstructor[C]) Logger() log.Logger { + return l.cfg.Logger +} + +// TxDecoder returns the tx decoder for the lane. +func (l *LaneConstructor[C]) TxDecoder() sdk.TxDecoder { + return l.cfg.TxDecoder +} + +// TxEncoder returns the tx encoder for the lane. +func (l *LaneConstructor[C]) TxEncoder() sdk.TxEncoder { + return l.cfg.TxEncoder +} + +// GetMaxBlockSpace returns the maximum amount of block space that the lane is +// allowed to consume as a percentage of the total block space. +func (l *LaneConstructor[C]) GetMaxBlockSpace() math.LegacyDec { + return l.cfg.MaxBlockSpace +} diff --git a/blockbuster/lane_interface.go b/blockbuster/lane_interface.go new file mode 100644 index 0000000..59ee35e --- /dev/null +++ b/blockbuster/lane_interface.go @@ -0,0 +1,71 @@ +package blockbuster + +import ( + "cosmossdk.io/log" + "cosmossdk.io/math" + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" +) + +// LaneMempool defines the interface a lane's mempool should implement. The basic API +// is the same as the sdk.Mempool, but it also includes a Compare function that is used +// to determine the relative priority of two transactions belonging in the same lane. +// +//go:generate mockery --name LaneMempool --output ./utils/mocks --outpkg mocks --case underscore +type LaneMempool interface { + sdkmempool.Mempool + + // Compare determines the relative priority of two transactions belonging in the same lane. Compare + // will return -1 if this transaction has a lower priority than the other transaction, 0 if they have + // the same priority, and 1 if this transaction has a higher priority than the other transaction. + Compare(ctx sdk.Context, this, other sdk.Tx) int + + // Contains returns true if the transaction is contained in the mempool. + Contains(tx sdk.Tx) bool +} + +// Lane defines an interface used for matching transactions to lanes, storing transactions, +// and constructing partial blocks. +// +//go:generate mockery --name Lane --output ./utils/mocks --outpkg mocks --case underscore +type Lane interface { + LaneMempool + + // PrepareLane builds a portion of the block. It inputs the maxTxBytes that can be + // included in the proposal for the given lane, the partial proposal, and a function + // to call the next lane in the chain. The next lane in the chain will be called with + // the updated proposal and context. + PrepareLane( + ctx sdk.Context, + proposal BlockProposal, + maxTxBytes int64, + next PrepareLanesHandler, + ) (BlockProposal, error) + + // CheckOrder validates that transactions belonging to this lane are not misplaced + // in the block proposal and respect the ordering rules of the lane. + CheckOrder(ctx sdk.Context, txs []sdk.Tx) error + + // ProcessLane verifies this lane's portion of a proposed block. It inputs the transactions + // that may belong to this lane and a function to call the next lane in the chain. The next + // lane in the chain will be called with the updated context and filtered down transactions. + ProcessLane(ctx sdk.Context, proposalTxs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error) + + // GetMaxBlockSpace returns the max block space for the lane as a relative percentage. + GetMaxBlockSpace() math.LegacyDec + + // Logger returns the lane's logger. + Logger() log.Logger + + // Name returns the name of the lane. + Name() string + + // SetAnteHandler sets the lane's antehandler. + SetAnteHandler(antehander sdk.AnteHandler) + + // SetIgnoreList sets the lanes that should be ignored by this lane. + SetIgnoreList(ignoreList []Lane) + + // Match determines if a transaction belongs to this lane. + Match(ctx sdk.Context, tx sdk.Tx) bool +} diff --git a/blockbuster/lane_mempool.go b/blockbuster/lane_mempool.go new file mode 100644 index 0000000..aa3c9d4 --- /dev/null +++ b/blockbuster/lane_mempool.go @@ -0,0 +1,116 @@ +package blockbuster + +import ( + "context" + "errors" + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/skip-mev/pob/blockbuster/utils" +) + +type ( + // ConstructorMempool defines a mempool that orders transactions based on the + // txPriority. The mempool is a wrapper on top of the SDK's Priority Nonce mempool. + // It include's additional helper functions that allow users to determine if a + // transaction is already in the mempool and to compare the priority of two + // transactions. + ConstructorMempool[C comparable] struct { + // index defines an index of transactions. + index sdkmempool.Mempool + + // txPriority defines the transaction priority function. It is used to + // retrieve the priority of a given transaction and to compare the priority + // of two transactions. The index utilizes this struct to order transactions + // in the mempool. + txPriority TxPriority[C] + + // txEncoder defines the sdk.Tx encoder that allows us to encode transactions + // to bytes. + txEncoder sdk.TxEncoder + + // txCache is a map of all transactions in the mempool. It is used + // to quickly check if a transaction is already in the mempool. + txCache map[string]struct{} + } +) + +// NewConstructorMempool returns a new ConstructorMempool. +func NewConstructorMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, maxTx int) *ConstructorMempool[C] { + return &ConstructorMempool[C]{ + index: NewPriorityMempool( + PriorityNonceMempoolConfig[C]{ + TxPriority: txPriority, + MaxTx: maxTx, + }, + ), + txPriority: txPriority, + txEncoder: txEncoder, + txCache: make(map[string]struct{}), + } +} + +// Insert inserts a transaction into the mempool. +func (cm *ConstructorMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { + if err := cm.index.Insert(ctx, tx); err != nil { + return fmt.Errorf("failed to insert tx into auction index: %w", err) + } + + _, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx) + if err != nil { + cm.Remove(tx) + return err + } + + cm.txCache[txHashStr] = struct{}{} + + return nil +} + +// Remove removes a transaction from the mempool. +func (cm *ConstructorMempool[C]) Remove(tx sdk.Tx) error { + if err := cm.index.Remove(tx); err != nil && !errors.Is(err, sdkmempool.ErrTxNotFound) { + return fmt.Errorf("failed to remove transaction from the mempool: %w", err) + } + + _, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx) + if err != nil { + return fmt.Errorf("failed to get tx hash string: %w", err) + } + + delete(cm.txCache, txHashStr) + + return nil +} + +// Select returns an iterator of all transactions in the mempool. NOTE: If you +// remove a transaction from the mempool while iterating over the transactions, +// the iterator will not be aware of the removal and will continue to iterate +// over the removed transaction. Be sure to reset the iterator if you remove a transaction. +func (cm *ConstructorMempool[C]) Select(ctx context.Context, txs [][]byte) sdkmempool.Iterator { + return cm.index.Select(ctx, txs) +} + +// CountTx returns the number of transactions in the mempool. +func (cm *ConstructorMempool[C]) CountTx() int { + return cm.index.CountTx() +} + +// Contains returns true if the transaction is contained in the mempool. +func (cm *ConstructorMempool[C]) Contains(tx sdk.Tx) bool { + _, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx) + if err != nil { + return false + } + + _, ok := cm.txCache[txHashStr] + return ok +} + +// Compare determines the relative priority of two transactions belonging in the same lane. +func (cm *ConstructorMempool[C]) Compare(ctx sdk.Context, this sdk.Tx, other sdk.Tx) int { + firstPriority := cm.txPriority.GetTxPriority(ctx, this) + secondPriority := cm.txPriority.GetTxPriority(ctx, other) + return cm.txPriority.Compare(firstPriority, secondPriority) +} diff --git a/blockbuster/lanes/terminator/lane.go b/blockbuster/lanes/terminator/lane.go index 159d69a..35e1453 100644 --- a/blockbuster/lanes/terminator/lane.go +++ b/blockbuster/lanes/terminator/lane.go @@ -2,7 +2,6 @@ package terminator import ( "context" - "fmt" "cosmossdk.io/log" "cosmossdk.io/math" @@ -11,6 +10,10 @@ import ( "github.com/skip-mev/pob/blockbuster" ) +const ( + LaneName = "Terminator" +) + // Terminator Lane will get added to the chain to simplify chaining code so that we // don't need to check if next == nil further up the chain // @@ -40,26 +43,42 @@ func (t Terminator) PrepareLane(_ sdk.Context, proposal blockbuster.BlockProposa return proposal, nil } +// ValidateLaneBasic is a no-op +func (t Terminator) CheckOrder(sdk.Context, []sdk.Tx) error { + return nil +} + // ProcessLane is a no-op func (t Terminator) ProcessLane(ctx sdk.Context, _ []sdk.Tx, _ blockbuster.ProcessLanesHandler) (sdk.Context, error) { return ctx, nil } +// GetMaxBlockSpace is a no-op +func (t Terminator) GetMaxBlockSpace() math.LegacyDec { + return math.LegacyZeroDec() +} + +// Logger is a no-op +func (t Terminator) Logger() log.Logger { + return log.NewNopLogger() +} + // Name returns the name of the lane func (t Terminator) Name() string { - return "Terminator" + return LaneName } +// SetAnteHandler is a no-op +func (t Terminator) SetAnteHandler(sdk.AnteHandler) {} + +// SetIgnoreList is a no-op +func (t Terminator) SetIgnoreList([]blockbuster.Lane) {} + // Match is a no-op func (t Terminator) Match(sdk.Context, sdk.Tx) bool { return false } -// VerifyTx is a no-op -func (t Terminator) VerifyTx(sdk.Context, sdk.Tx) error { - return fmt.Errorf("Terminator lane should not be called") -} - // Contains is a no-op func (t Terminator) Contains(sdk.Tx) bool { return false @@ -85,20 +104,7 @@ func (t Terminator) Select(context.Context, [][]byte) sdkmempool.Iterator { return nil } -// ValidateLaneBasic is a no-op -func (t Terminator) ProcessLaneBasic(sdk.Context, []sdk.Tx) error { - return nil -} - -// SetLaneConfig is a no-op -func (t Terminator) SetAnteHandler(sdk.AnteHandler) {} - -// Logger is a no-op -func (t Terminator) Logger() log.Logger { - return log.NewNopLogger() -} - -// GetMaxBlockSpace is a no-op -func (t Terminator) GetMaxBlockSpace() math.LegacyDec { - return math.LegacyZeroDec() +// HasHigherPriority is a no-op +func (t Terminator) Compare(sdk.Context, sdk.Tx, sdk.Tx) int { + return 0 } diff --git a/blockbuster/priority_nonce.go b/blockbuster/priority_nonce.go index 5b55c14..b561d50 100644 --- a/blockbuster/priority_nonce.go +++ b/blockbuster/priority_nonce.go @@ -313,14 +313,16 @@ func (i *PriorityNonceIterator[C]) Next() sdkmempool.Iterator { // We've reached a transaction with a priority lower than the next highest // priority in the pool. - if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 { - return i.iteratePriority() - } else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 { - // Weight is incorporated into the priority index key only (not sender index) - // so we must fetch it here from the scores map. - weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight - if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 { + if i.priorityNode.Next() != nil { + if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 { return i.iteratePriority() + } else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 { + // Weight is incorporated into the priority index key only (not sender index) + // so we must fetch it here from the scores map. + weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight + if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 { + return i.iteratePriority() + } } } diff --git a/blockbuster/proposals.go b/blockbuster/proposals.go index 4ea94a1..95da151 100644 --- a/blockbuster/proposals.go +++ b/blockbuster/proposals.go @@ -131,14 +131,6 @@ func (p *Proposal) UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte } p.totalTxBytes = updatedSize - lane.Logger().Info( - "adding transactions to proposal", - "lane", lane.Name(), - "num_txs", len(partialProposalTxs), - "total_tx_bytes", partialProposalSize, - "cumulative_size", updatedSize, - ) - p.txs = append(p.txs, partialProposalTxs...) for _, tx := range partialProposalTxs { @@ -146,8 +138,23 @@ func (p *Proposal) UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte txHashStr := hex.EncodeToString(txHash[:]) p.cache[txHashStr] = struct{}{} + + lane.Logger().Info( + "added transaction to proposal", + "lane", lane.Name(), + "tx_hash", txHashStr, + "tx_bytes", len(tx), + ) } + lane.Logger().Info( + "added transactions to proposal", + "lane", lane.Name(), + "num_txs", len(partialProposalTxs), + "total_tx_bytes", partialProposalSize, + "cumulative_size", updatedSize, + ) + return nil } diff --git a/blockbuster/abci/abci.go b/blockbuster/proposals/abci.go similarity index 84% rename from blockbuster/abci/abci.go rename to blockbuster/proposals/abci.go index 50f418e..a1a0135 100644 --- a/blockbuster/abci/abci.go +++ b/blockbuster/proposals/abci.go @@ -1,6 +1,8 @@ package abci import ( + "fmt" + "cosmossdk.io/log" abci "github.com/cometbft/cometbft/abci/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -20,13 +22,14 @@ type ( } ) -// NewProposalHandler returns a new abci++ proposal handler. -func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, mempool blockbuster.Mempool) *ProposalHandler { +// NewProposalHandler returns a new abci++ proposal handler. This proposal handler will +// iteratively call each of the lanes in the chain to prepare and process a proposal. +func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, lanes []blockbuster.Lane) *ProposalHandler { return &ProposalHandler{ logger: logger, txDecoder: txDecoder, - prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...), - processLanesHandler: ChainProcessLanes(mempool.Registry()...), + prepareLanesHandler: ChainPrepareLanes(lanes...), + processLanesHandler: ChainProcessLanes(lanes...), } } @@ -55,6 +58,7 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { "prepared proposal", "num_txs", proposal.GetNumTxs(), "total_tx_bytes", proposal.GetTotalTxBytes(), + "height", req.Height, ) return &abci.ResponsePrepareProposal{ @@ -71,9 +75,11 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { return func(ctx sdk.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) { // In the case where any of the lanes panic, we recover here and return a reject status. defer func() { - if err := recover(); err != nil { - h.logger.Error("failed to process proposal", "err", err) + if rec := recover(); rec != nil { + h.logger.Error("failed to process proposal", "recover_err", rec) + resp = &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + err = fmt.Errorf("failed to process proposal: %v", rec) } }() @@ -125,9 +131,14 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle // Cache the context in the case where any of the lanes fail to prepare the proposal. cacheCtx, write := ctx.CacheContext() + // We utilize a recover to handle any panics or errors that occur during the preparation + // of a lane's transactions. This defer will first check if there was a panic or error + // thrown from the lane's preparation logic. If there was, we log the error, skip the lane, + // and call the next lane in the chain to the prepare the proposal. defer func() { if rec := recover(); rec != nil || err != nil { lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec) + lane.Logger().Info("skipping lane", "lane", lane.Name()) lanesRemaining := len(chain) switch { @@ -141,22 +152,13 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle // is the lane that failed to prepare the proposal but the second lane in the // chain is not the terminator lane so there could potentially be more transactions // added to the proposal - maxTxBytesForLane := utils.GetMaxTxBytesForLane( - partialProposal.GetMaxTxBytes(), - partialProposal.GetTotalTxBytes(), - chain[1].GetMaxBlockSpace(), - ) - - finalProposal, err = chain[1].PrepareLane( - ctx, - partialProposal, - maxTxBytesForLane, - ChainPrepareLanes(chain[2:]...), - ) + finalProposal, err = ChainPrepareLanes(chain[1:]...)(ctx, partialProposal) } } else { // Write the cache to the context since we know that the lane successfully prepared - // the partial proposal. + // the partial proposal. State is written to in a backwards, cascading fashion. This means + // that the final context will only be updated after all other lanes have successfully + // prepared the partial proposal. write() } }() @@ -198,7 +200,7 @@ func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandle chain[0].Logger().Info("processing lane", "lane", chain[0].Name()) - if err := chain[0].ProcessLaneBasic(ctx, proposalTxs); err != nil { + if err := chain[0].CheckOrder(ctx, proposalTxs); err != nil { chain[0].Logger().Error("failed to process lane", "lane", chain[0].Name(), "err", err) return ctx, err } diff --git a/blockbuster/abci/abci_test.go b/blockbuster/proposals/abci_test.go similarity index 99% rename from blockbuster/abci/abci_test.go rename to blockbuster/proposals/abci_test.go index 4444d4a..b778edf 100644 --- a/blockbuster/abci/abci_test.go +++ b/blockbuster/proposals/abci_test.go @@ -1,4 +1,4 @@ -package abci_test +package proposals_test import ( "math/rand" diff --git a/blockbuster/abci/check_tx.go b/blockbuster/proposals/check_tx.go similarity index 99% rename from blockbuster/abci/check_tx.go rename to blockbuster/proposals/check_tx.go index e9deaf9..7facb9c 100644 --- a/blockbuster/abci/check_tx.go +++ b/blockbuster/proposals/check_tx.go @@ -1,4 +1,4 @@ -package abci +package proposals import ( "context" diff --git a/blockbuster/types.go b/blockbuster/types.go new file mode 100644 index 0000000..d816863 --- /dev/null +++ b/blockbuster/types.go @@ -0,0 +1,164 @@ +package blockbuster + +import ( + "fmt" + + "cosmossdk.io/log" + "cosmossdk.io/math" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +type ( + // MatchHandler is utilized to determine if a transaction should be included in the lane. This + // function can be a stateless or stateful check on the transaction. + MatchHandler func(ctx sdk.Context, tx sdk.Tx) bool + + // PrepareLaneHandler is responsible for preparing transactions to be included in the block from a + // given lane. Given a lane, this function should return the transactions to include in the block, + // the transactions that must be removed from the lane, and an error if one occurred. + PrepareLaneHandler func( + ctx sdk.Context, + proposal BlockProposal, + maxTxBytes int64, + ) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error) + + // ProcessLaneHandler is responsible for processing transactions that are included in a block and + // belong to a given lane. ProcessLaneHandler is executed after CheckOrderHandler so the transactions + // passed into this function SHOULD already be in order respecting the ordering rules of the lane and + // respecting the ordering rules of mempool relative to the lanes it has. + ProcessLaneHandler func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error) + + // CheckOrderHandler is responsible for checking the order of transactions that belong to a given + // lane. This handler should be used to verify that the ordering of transactions passed into the + // function respect the ordering logic of the lane (if any transactions from the lane are included). + // This function should also ensure that transactions that belong to this lane are contiguous and do + // not have any transactions from other lanes in between them. + CheckOrderHandler func(ctx sdk.Context, txs []sdk.Tx) error + + // PrepareLanesHandler wraps all of the lanes' PrepareLane function into a single chained + // function. You can think of it like an AnteHandler, but for preparing proposals in the + // context of lanes instead of modules. + PrepareLanesHandler func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error) + + // ProcessLanesHandler wraps all of the lanes' ProcessLane functions into a single chained + // function. You can think of it like an AnteHandler, but for processing proposals in the + // context of lanes instead of modules. + ProcessLanesHandler func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error) + + // LaneConfig defines the basic functionality needed for a lane. + LaneConfig struct { + Logger log.Logger + TxEncoder sdk.TxEncoder + TxDecoder sdk.TxDecoder + AnteHandler sdk.AnteHandler + + // MaxBlockSpace defines the relative percentage of block space that can be + // used by this lane. NOTE: If this is set to zero, then there is no limit + // on the number of transactions that can be included in the block for this + // lane (up to maxTxBytes as provided by the request). This is useful for the default lane. + MaxBlockSpace math.LegacyDec + + // IgnoreList defines the list of lanes to ignore when processing transactions. This + // is useful for when you want lanes to exist after the default lane. For example, + // say there are two lanes: default and free. The free lane should be processed after + // the default lane. In this case, the free lane should be added to the ignore list + // of the default lane. Otherwise, the transactions that belong to the free lane + // will be processed by the default lane (which accepts all transactions by default). + IgnoreList []Lane + + // MaxTxs sets the maximum number of transactions allowed in the mempool with + // the semantics: + // - if MaxTx == 0, there is no cap on the number of transactions in the mempool + // - if MaxTx > 0, the mempool will cap the number of transactions it stores, + // and will prioritize transactions by their priority and sender-nonce + // (sequence number) when evicting transactions. + // - if MaxTx < 0, `Insert` is a no-op. + MaxTxs int + } +) + +// NewLaneConfig returns a new LaneConfig. This will be embedded in a lane. +func NewBaseLaneConfig( + logger log.Logger, + txEncoder sdk.TxEncoder, + txDecoder sdk.TxDecoder, + anteHandler sdk.AnteHandler, + maxBlockSpace math.LegacyDec, +) LaneConfig { + return LaneConfig{ + Logger: logger, + TxEncoder: txEncoder, + TxDecoder: txDecoder, + AnteHandler: anteHandler, + MaxBlockSpace: maxBlockSpace, + } +} + +// ValidateBasic validates the lane configuration. +func (c *LaneConfig) ValidateBasic() error { + if c.Logger == nil { + return fmt.Errorf("logger cannot be nil") + } + + if c.TxEncoder == nil { + return fmt.Errorf("tx encoder cannot be nil") + } + + if c.TxDecoder == nil { + return fmt.Errorf("tx decoder cannot be nil") + } + + if c.MaxBlockSpace.IsNil() || c.MaxBlockSpace.IsNegative() || c.MaxBlockSpace.GT(math.LegacyOneDec()) { + return fmt.Errorf("max block space must be set to a value between 0 and 1") + } + + return nil +} + +// NoOpPrepareLanesHandler returns a no-op prepare lanes handler. +// This should only be used for testing. +func NoOpPrepareLanesHandler() PrepareLanesHandler { + return func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error) { + return proposal, nil + } +} + +// NoOpPrepareLaneHandler returns a no-op prepare lane handler. +// This should only be used for testing. +func NoOpPrepareLaneHandler() PrepareLaneHandler { + return func(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error) { + return nil, nil, nil + } +} + +// PanicPrepareLaneHandler returns a prepare lane handler that panics. +// This should only be used for testing. +func PanicPrepareLaneHandler() PrepareLaneHandler { + return func(sdk.Context, BlockProposal, int64) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error) { + panic("panic prepare lanes handler") + } +} + +// NoOpProcessLanesHandler returns a no-op process lanes handler. +// This should only be used for testing. +func NoOpProcessLanesHandler() ProcessLanesHandler { + return func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error) { + return ctx, nil + } +} + +// NoOpProcessLaneHandler returns a no-op process lane handler. +// This should only be used for testing. +func NoOpProcessLaneHandler() ProcessLaneHandler { + return func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error) { + return txs, nil + } +} + +// PanicProcessLanesHandler returns a process lanes handler that panics. +// This should only be used for testing. +func PanicProcessLaneHandler() ProcessLaneHandler { + return func(sdk.Context, []sdk.Tx) ([]sdk.Tx, error) { + panic("panic process lanes handler") + } +} diff --git a/blockbuster/utils/utils.go b/blockbuster/utils/utils.go index 6edf364..a4b2ae0 100644 --- a/blockbuster/utils/utils.go +++ b/blockbuster/utils/utils.go @@ -40,8 +40,8 @@ func GetDecodedTxs(txDecoder sdk.TxDecoder, txs [][]byte) ([]sdk.Tx, error) { } // RemoveTxsFromLane removes the transactions from the given lane's mempool. -func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) error { - for tx := range txs { +func RemoveTxsFromLane(txs []sdk.Tx, mempool sdkmempool.Mempool) error { + for _, tx := range txs { if err := mempool.Remove(tx); err != nil { return err }