This commit is contained in:
David Terpay 2023-08-14 17:38:20 -04:00
parent 1ed1adc856
commit 427194f70c
No known key found for this signature in database
GPG Key ID: 627EFB00DADF0CD1
14 changed files with 892 additions and 185 deletions

67
blockbuster/abci.go Normal file
View File

@ -0,0 +1,67 @@
package blockbuster
import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/skip-mev/pob/blockbuster/utils"
)
// PrepareLane will prepare a partial proposal for the lane. It will select transactions from the
// lane respecting the selection logic of the prepareLaneHandler. It will then update the partial
// proposal with the selected transactions. If the proposal is unable to be updated, we return an
// error. The proposal will only be modified if it passes all of the invarient checks.
func (l *LaneConstructor[C]) PrepareLane(
ctx sdk.Context,
proposal BlockProposal,
maxTxBytes int64,
next PrepareLanesHandler,
) (BlockProposal, error) {
txs, txsToRemove, err := l.prepareLaneHandler(ctx, proposal, maxTxBytes)
if err != nil {
return proposal, err
}
// Remove all transactions that were invalid during the creation of the partial proposal.
if err := utils.RemoveTxsFromLane(txsToRemove, l); err != nil {
l.Logger().Error(
"failed to remove transactions from lane",
"lane", l.Name(),
"err", err,
)
}
// Update the proposal with the selected transactions.
if err := proposal.UpdateProposal(l, txs); err != nil {
return proposal, err
}
return next(ctx, proposal)
}
// CheckOrder checks that the ordering logic of the lane is respected given the set of transactions
// in the block proposal. If the ordering logic is not respected, we return an error.
func (l *LaneConstructor[C]) CheckOrder(ctx sdk.Context, txs []sdk.Tx) error {
return l.checkOrderHandler(ctx, txs)
}
// ProcessLane verifies that the transactions included in the block proposal are valid respecting
// the verification logic of the lane (processLaneHandler). If the transactions are valid, we
// return the transactions that do not belong to this lane to the next lane. If the transactions
// are invalid, we return an error.
func (l *LaneConstructor[C]) ProcessLane(ctx sdk.Context, txs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error) {
remainingTxs, err := l.processLaneHandler(ctx, txs)
if err != nil {
return ctx, err
}
return next(ctx, remainingTxs)
}
// AnteVerifyTx verifies that the transaction is valid respecting the ante verification logic of
// of the antehandler chain.
func (l *LaneConstructor[C]) AnteVerifyTx(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
if l.cfg.AnteHandler != nil {
return l.cfg.AnteHandler(ctx, tx, simulate)
}
return ctx, nil
}

199
blockbuster/handlers.go Normal file
View File

@ -0,0 +1,199 @@
package blockbuster
import (
"context"
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/skip-mev/pob/blockbuster/utils"
)
// DefaultPrepareLaneHandler returns a default implementation of the PrepareLaneHandler. It
// selects all transactions in the mempool that are valid and not already in the partial
// proposal. It will continue to reap transactions until the maximum block space for this
// lane has been reached. Additionally, any transactions that are invalid will be returned.
func (l *LaneConstructor[C]) DefaultPrepareLaneHandler() PrepareLaneHandler {
return func(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64) ([][]byte, []sdk.Tx, error) {
var (
totalSize int64
txs [][]byte
txsToRemove []sdk.Tx
)
// Select all transactions in the mempool that are valid and not already in the
// partial proposal.
for iterator := l.Select(ctx, nil); iterator != nil; iterator = iterator.Next() {
tx := iterator.Tx()
txBytes, hash, err := utils.GetTxHashStr(l.TxEncoder(), tx)
if err != nil {
l.Logger().Info("failed to get hash of tx", "err", err)
txsToRemove = append(txsToRemove, tx)
continue
}
// Double check that the transaction belongs to this lane.
if !l.Match(ctx, tx) {
l.Logger().Info(
"failed to select tx for lane; tx does not belong to lane",
"tx_hash", hash,
"lane", l.Name(),
)
txsToRemove = append(txsToRemove, tx)
continue
}
// if the transaction is already in the (partial) block proposal, we skip it.
if proposal.Contains(txBytes) {
l.Logger().Info(
"failed to select tx for lane; tx is already in proposal",
"tx_hash", hash,
"lane", l.Name(),
)
continue
}
// If the transaction is too large, we break and do not attempt to include more txs.
txSize := int64(len(txBytes))
if updatedSize := totalSize + txSize; updatedSize > maxTxBytes {
l.Logger().Info(
"tx bytes above the maximum allowed",
"lane", l.Name(),
"tx_size", txSize,
"total_size", totalSize,
"max_tx_bytes", maxTxBytes,
"tx_hash", hash,
)
break
}
// Verify the transaction.
if ctx, err = l.AnteVerifyTx(ctx, tx, false); err != nil {
l.Logger().Info(
"failed to verify tx",
"tx_hash", hash,
"err", err,
)
txsToRemove = append(txsToRemove, tx)
continue
}
totalSize += txSize
txs = append(txs, txBytes)
}
return txs, txsToRemove, nil
}
}
// DefaultProcessLaneHandler returns a default implementation of the ProcessLaneHandler. It
// verifies all transactions in the lane that matches to the lane. If any transaction
// fails to verify, the entire proposal is rejected. If the handler comes across a transaction
// that does not match the lane's matcher, it will return the remaining transactions in the
// proposal.
func (l *LaneConstructor[C]) DefaultProcessLaneHandler() ProcessLaneHandler {
return func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error) {
var err error
// Process all transactions that match the lane's matcher.
for index, tx := range txs {
if l.Match(ctx, tx) {
if ctx, err = l.AnteVerifyTx(ctx, tx, false); err != nil {
return nil, fmt.Errorf("failed to verify tx: %w", err)
}
} else {
return txs[index:], nil
}
}
// This means we have processed all transactions in the proposal.
return nil, nil
}
}
// DefaultCheckOrderHandler returns a default implementation of the CheckOrderHandler. It
// ensures the following invariants:
//
// 1. All transactions that belong to this lane respect the ordering logic defined by the
// lane.
// 2. Transactions that belong to other lanes cannot be interleaved with transactions that
// belong to this lane.
func (l *LaneConstructor[C]) DefaultCheckOrderHandler() CheckOrderHandler {
return func(ctx sdk.Context, txs []sdk.Tx) error {
seenOtherLaneTx := false
for index, tx := range txs {
if l.Match(ctx, tx) {
if seenOtherLaneTx {
return fmt.Errorf("the %s lane contains a transaction that belongs to another lane", l.Name())
}
// If the transactions do not respect the priority defined by the mempool, we consider the proposal
// to be invalid
if index > 0 && l.Compare(ctx, txs[index-1], tx) == -1 {
return fmt.Errorf("transaction at index %d has a higher priority than %d", index, index-1)
}
} else {
seenOtherLaneTx = true
}
}
return nil
}
}
// DefaultMatchHandler returns a default implementation of the MatchHandler. It matches all
// transactions.
func DefaultMatchHandler() MatchHandler {
return func(ctx sdk.Context, tx sdk.Tx) bool {
return true
}
}
// DefaultTxPriority returns a default implementation of the TxPriority. It prioritizes
// transactions by their fee.
func DefaultTxPriority() TxPriority[string] {
return TxPriority[string]{
GetTxPriority: func(goCtx context.Context, tx sdk.Tx) string {
feeTx, ok := tx.(sdk.FeeTx)
if !ok {
return ""
}
return feeTx.GetFee().String()
},
Compare: func(a, b string) int {
aCoins, _ := sdk.ParseCoinsNormalized(a)
bCoins, _ := sdk.ParseCoinsNormalized(b)
switch {
case aCoins == nil && bCoins == nil:
return 0
case aCoins == nil:
return -1
case bCoins == nil:
return 1
default:
switch {
case aCoins.IsAllGT(bCoins):
return 1
case aCoins.IsAllLT(bCoins):
return -1
default:
return 0
}
}
},
MinValue: "",
}
}

View File

@ -1,123 +0,0 @@
package blockbuster
import (
"fmt"
"cosmossdk.io/log"
"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
type (
// PrepareLanesHandler wraps all of the lanes Prepare function into a single chained
// function. You can think of it like an AnteHandler, but for preparing proposals in the
// context of lanes instead of modules.
PrepareLanesHandler func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error)
// ProcessLanesHandler wraps all of the lanes Process functions into a single chained
// function. You can think of it like an AnteHandler, but for processing proposals in the
// context of lanes instead of modules.
ProcessLanesHandler func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error)
// BaseLaneConfig defines the basic functionality needed for a lane.
BaseLaneConfig struct {
Logger log.Logger
TxEncoder sdk.TxEncoder
TxDecoder sdk.TxDecoder
AnteHandler sdk.AnteHandler
// MaxBlockSpace defines the relative percentage of block space that can be
// used by this lane. NOTE: If this is set to zero, then there is no limit
// on the number of transactions that can be included in the block for this
// lane (up to maxTxBytes as provided by the request). This is useful for the default lane.
MaxBlockSpace math.LegacyDec
// IgnoreList defines the list of lanes to ignore when processing transactions. This
// is useful for when you want lanes to exist after the default lane. For example,
// say there are two lanes: default and free. The free lane should be processed after
// the default lane. In this case, the free lane should be added to the ignore list
// of the default lane. Otherwise, the transactions that belong to the free lane
// will be processed by the default lane.
IgnoreList []Lane
}
// Lane defines an interface used for block construction
Lane interface {
sdkmempool.Mempool
// Name returns the name of the lane.
Name() string
// Match determines if a transaction belongs to this lane.
Match(ctx sdk.Context, tx sdk.Tx) bool
// VerifyTx verifies the transaction belonging to this lane.
VerifyTx(ctx sdk.Context, tx sdk.Tx) error
// Contains returns true if the mempool/lane contains the given transaction.
Contains(tx sdk.Tx) bool
// PrepareLane builds a portion of the block. It inputs the maxTxBytes that can be
// included in the proposal for the given lane, the partial proposal, and a function
// to call the next lane in the chain. The next lane in the chain will be called with
// the updated proposal and context.
PrepareLane(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64, next PrepareLanesHandler) (BlockProposal, error)
// ProcessLaneBasic validates that transactions belonging to this lane are not misplaced
// in the block proposal.
ProcessLaneBasic(ctx sdk.Context, txs []sdk.Tx) error
// ProcessLane verifies this lane's portion of a proposed block. It inputs the transactions
// that may belong to this lane and a function to call the next lane in the chain. The next
// lane in the chain will be called with the updated context and filtered down transactions.
ProcessLane(ctx sdk.Context, proposalTxs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error)
// SetAnteHandler sets the lane's antehandler.
SetAnteHandler(antehander sdk.AnteHandler)
// Logger returns the lane's logger.
Logger() log.Logger
// GetMaxBlockSpace returns the max block space for the lane as a relative percentage.
GetMaxBlockSpace() math.LegacyDec
}
)
// NewLaneConfig returns a new LaneConfig. This will be embedded in a lane.
func NewBaseLaneConfig(
logger log.Logger,
txEncoder sdk.TxEncoder,
txDecoder sdk.TxDecoder,
anteHandler sdk.AnteHandler,
maxBlockSpace math.LegacyDec,
) BaseLaneConfig {
return BaseLaneConfig{
Logger: logger,
TxEncoder: txEncoder,
TxDecoder: txDecoder,
AnteHandler: anteHandler,
MaxBlockSpace: maxBlockSpace,
}
}
// ValidateBasic validates the lane configuration.
func (c *BaseLaneConfig) ValidateBasic() error {
if c.Logger == nil {
return fmt.Errorf("logger cannot be nil")
}
if c.TxEncoder == nil {
return fmt.Errorf("tx encoder cannot be nil")
}
if c.TxDecoder == nil {
return fmt.Errorf("tx decoder cannot be nil")
}
if c.MaxBlockSpace.IsNil() || c.MaxBlockSpace.IsNegative() || c.MaxBlockSpace.GT(math.LegacyOneDec()) {
return fmt.Errorf("max block space must be set to a value between 0 and 1")
}
return nil
}

View File

@ -0,0 +1,196 @@
package blockbuster
import (
"fmt"
"cosmossdk.io/log"
"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
)
// LaneConstructor is a generic implementation of a lane. It is meant to be used
// as a base for other lanes to be built on top of. It provides a default
// implementation of the MatchHandler, PrepareLaneHandler, ProcessLaneHandler,
// and CheckOrderHandler. To extend this lane, you must either utilize the default
// handlers or construct your own that you pass into the constructor/setters.
type LaneConstructor[C comparable] struct {
// cfg stores functionality requred to encode/decode transactions, maintains how
// many transactions are allowed in this lane's mempool, and the amount of block
// space this lane is allowed to consume.
cfg LaneConfig
// laneName is the name of the lane.
laneName string
// LaneMempool is the mempool that is responsible for storing transactions
// that are waiting to be processed.
LaneMempool
// matchHandler is the function that determines whether or not a transaction
// should be processed by this lane.
matchHandler MatchHandler
// prepareLaneHandler is the function that is called when a new proposal is being
// requested and the lane needs to submit transactions it wants included in the block.
prepareLaneHandler PrepareLaneHandler
// checkOrderHandler is the function that is called when a new proposal is being
// verified and the lane needs to verify that the transactions included in the proposal
// respect the ordering rules of the lane and does not interleave transactions from other lanes.
checkOrderHandler CheckOrderHandler
// processLaneHandler is the function that is called when a new proposal is being
// verified and the lane needs to verify that the transactions included in the proposal
// are valid respecting the verification logic of the lane.
processLaneHandler ProcessLaneHandler
}
// NewLaneConstructor returns a new lane constructor. When creating this lane, the type
// of the lane must be specified. The type of the lane is directly associated with the
// type of the mempool that is used to store transactions that are waiting to be processed.
func NewLaneConstructor[C comparable](
cfg LaneConfig,
laneName string,
laneMempool LaneMempool,
matchHandlerFn MatchHandler,
) *LaneConstructor[C] {
lane := &LaneConstructor[C]{
cfg: cfg,
laneName: laneName,
LaneMempool: laneMempool,
matchHandler: matchHandlerFn,
}
if err := lane.ValidateBasic(); err != nil {
panic(err)
}
return lane
}
// ValidateBasic ensures that the lane was constructed properly. In the case that
// the lane was not constructed with proper handlers, default handlers are set.
func (l *LaneConstructor[C]) ValidateBasic() error {
if err := l.cfg.ValidateBasic(); err != nil {
return err
}
if l.laneName == "" {
return fmt.Errorf("lane name cannot be empty")
}
if l.LaneMempool == nil {
return fmt.Errorf("lane mempool cannot be nil")
}
if l.matchHandler == nil {
return fmt.Errorf("match handler cannot be nil")
}
if l.prepareLaneHandler == nil {
l.prepareLaneHandler = l.DefaultPrepareLaneHandler()
}
if l.processLaneHandler == nil {
l.processLaneHandler = l.DefaultProcessLaneHandler()
}
if l.checkOrderHandler == nil {
l.checkOrderHandler = l.DefaultCheckOrderHandler()
}
return nil
}
// SetPrepareLaneHandler sets the prepare lane handler for the lane. This handler
// is called when a new proposal is being requested and the lane needs to submit
// transactions it wants included in the block.
func (l *LaneConstructor[C]) SetPrepareLaneHandler(prepareLaneHandler PrepareLaneHandler) {
if prepareLaneHandler == nil {
panic("prepare lane handler cannot be nil")
}
l.prepareLaneHandler = prepareLaneHandler
}
// SetProcessLaneHandler sets the process lane handler for the lane. This handler
// is called when a new proposal is being verified and the lane needs to verify
// that the transactions included in the proposal are valid respecting the verification
// logic of the lane.
func (l *LaneConstructor[C]) SetProcessLaneHandler(processLaneHandler ProcessLaneHandler) {
if processLaneHandler == nil {
panic("process lane handler cannot be nil")
}
l.processLaneHandler = processLaneHandler
}
// SetCheckOrderHandler sets the check order handler for the lane. This handler
// is called when a new proposal is being verified and the lane needs to verify
// that the transactions included in the proposal respect the ordering rules of
// the lane and does not include transactions from other lanes.
func (l *LaneConstructor[C]) SetCheckOrderHandler(checkOrderHandler CheckOrderHandler) {
if checkOrderHandler == nil {
panic("check order handler cannot be nil")
}
l.checkOrderHandler = checkOrderHandler
}
// Match returns true if the transaction should be processed by this lane. This
// function first determines if the transaction matches the lane and then checks
// if the transaction is on the ignore list. If the transaction is on the ignore
// list, it returns false.
func (l *LaneConstructor[C]) Match(ctx sdk.Context, tx sdk.Tx) bool {
return l.matchHandler(ctx, tx) && !l.CheckIgnoreList(ctx, tx)
}
// CheckIgnoreList returns true if the transaction is on the ignore list. The ignore
// list is utilized to prevent transactions that should be considered in other lanes
// from being considered from this lane.
func (l *LaneConstructor[C]) CheckIgnoreList(ctx sdk.Context, tx sdk.Tx) bool {
for _, lane := range l.cfg.IgnoreList {
if lane.Match(ctx, tx) {
return true
}
}
return false
}
// Name returns the name of the lane.
func (l *LaneConstructor[C]) Name() string {
return l.laneName
}
// SetIgnoreList sets the ignore list for the lane. The ignore list is a list
// of lanes that the lane should ignore when processing transactions.
func (l *LaneConstructor[C]) SetIgnoreList(lanes []Lane) {
l.cfg.IgnoreList = lanes
}
// SetAnteHandler sets the ante handler for the lane.
func (l *LaneConstructor[C]) SetAnteHandler(anteHandler sdk.AnteHandler) {
l.cfg.AnteHandler = anteHandler
}
// Logger returns the logger for the lane.
func (l *LaneConstructor[C]) Logger() log.Logger {
return l.cfg.Logger
}
// TxDecoder returns the tx decoder for the lane.
func (l *LaneConstructor[C]) TxDecoder() sdk.TxDecoder {
return l.cfg.TxDecoder
}
// TxEncoder returns the tx encoder for the lane.
func (l *LaneConstructor[C]) TxEncoder() sdk.TxEncoder {
return l.cfg.TxEncoder
}
// GetMaxBlockSpace returns the maximum amount of block space that the lane is
// allowed to consume as a percentage of the total block space.
func (l *LaneConstructor[C]) GetMaxBlockSpace() math.LegacyDec {
return l.cfg.MaxBlockSpace
}

View File

@ -0,0 +1,71 @@
package blockbuster
import (
"cosmossdk.io/log"
"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
// LaneMempool defines the interface a lane's mempool should implement. The basic API
// is the same as the sdk.Mempool, but it also includes a Compare function that is used
// to determine the relative priority of two transactions belonging in the same lane.
//
//go:generate mockery --name LaneMempool --output ./utils/mocks --outpkg mocks --case underscore
type LaneMempool interface {
sdkmempool.Mempool
// Compare determines the relative priority of two transactions belonging in the same lane. Compare
// will return -1 if this transaction has a lower priority than the other transaction, 0 if they have
// the same priority, and 1 if this transaction has a higher priority than the other transaction.
Compare(ctx sdk.Context, this, other sdk.Tx) int
// Contains returns true if the transaction is contained in the mempool.
Contains(tx sdk.Tx) bool
}
// Lane defines an interface used for matching transactions to lanes, storing transactions,
// and constructing partial blocks.
//
//go:generate mockery --name Lane --output ./utils/mocks --outpkg mocks --case underscore
type Lane interface {
LaneMempool
// PrepareLane builds a portion of the block. It inputs the maxTxBytes that can be
// included in the proposal for the given lane, the partial proposal, and a function
// to call the next lane in the chain. The next lane in the chain will be called with
// the updated proposal and context.
PrepareLane(
ctx sdk.Context,
proposal BlockProposal,
maxTxBytes int64,
next PrepareLanesHandler,
) (BlockProposal, error)
// CheckOrder validates that transactions belonging to this lane are not misplaced
// in the block proposal and respect the ordering rules of the lane.
CheckOrder(ctx sdk.Context, txs []sdk.Tx) error
// ProcessLane verifies this lane's portion of a proposed block. It inputs the transactions
// that may belong to this lane and a function to call the next lane in the chain. The next
// lane in the chain will be called with the updated context and filtered down transactions.
ProcessLane(ctx sdk.Context, proposalTxs []sdk.Tx, next ProcessLanesHandler) (sdk.Context, error)
// GetMaxBlockSpace returns the max block space for the lane as a relative percentage.
GetMaxBlockSpace() math.LegacyDec
// Logger returns the lane's logger.
Logger() log.Logger
// Name returns the name of the lane.
Name() string
// SetAnteHandler sets the lane's antehandler.
SetAnteHandler(antehander sdk.AnteHandler)
// SetIgnoreList sets the lanes that should be ignored by this lane.
SetIgnoreList(ignoreList []Lane)
// Match determines if a transaction belongs to this lane.
Match(ctx sdk.Context, tx sdk.Tx) bool
}

116
blockbuster/lane_mempool.go Normal file
View File

@ -0,0 +1,116 @@
package blockbuster
import (
"context"
"errors"
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
"github.com/skip-mev/pob/blockbuster/utils"
)
type (
// ConstructorMempool defines a mempool that orders transactions based on the
// txPriority. The mempool is a wrapper on top of the SDK's Priority Nonce mempool.
// It include's additional helper functions that allow users to determine if a
// transaction is already in the mempool and to compare the priority of two
// transactions.
ConstructorMempool[C comparable] struct {
// index defines an index of transactions.
index sdkmempool.Mempool
// txPriority defines the transaction priority function. It is used to
// retrieve the priority of a given transaction and to compare the priority
// of two transactions. The index utilizes this struct to order transactions
// in the mempool.
txPriority TxPriority[C]
// txEncoder defines the sdk.Tx encoder that allows us to encode transactions
// to bytes.
txEncoder sdk.TxEncoder
// txCache is a map of all transactions in the mempool. It is used
// to quickly check if a transaction is already in the mempool.
txCache map[string]struct{}
}
)
// NewConstructorMempool returns a new ConstructorMempool.
func NewConstructorMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, maxTx int) *ConstructorMempool[C] {
return &ConstructorMempool[C]{
index: NewPriorityMempool(
PriorityNonceMempoolConfig[C]{
TxPriority: txPriority,
MaxTx: maxTx,
},
),
txPriority: txPriority,
txEncoder: txEncoder,
txCache: make(map[string]struct{}),
}
}
// Insert inserts a transaction into the mempool.
func (cm *ConstructorMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error {
if err := cm.index.Insert(ctx, tx); err != nil {
return fmt.Errorf("failed to insert tx into auction index: %w", err)
}
_, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx)
if err != nil {
cm.Remove(tx)
return err
}
cm.txCache[txHashStr] = struct{}{}
return nil
}
// Remove removes a transaction from the mempool.
func (cm *ConstructorMempool[C]) Remove(tx sdk.Tx) error {
if err := cm.index.Remove(tx); err != nil && !errors.Is(err, sdkmempool.ErrTxNotFound) {
return fmt.Errorf("failed to remove transaction from the mempool: %w", err)
}
_, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx)
if err != nil {
return fmt.Errorf("failed to get tx hash string: %w", err)
}
delete(cm.txCache, txHashStr)
return nil
}
// Select returns an iterator of all transactions in the mempool. NOTE: If you
// remove a transaction from the mempool while iterating over the transactions,
// the iterator will not be aware of the removal and will continue to iterate
// over the removed transaction. Be sure to reset the iterator if you remove a transaction.
func (cm *ConstructorMempool[C]) Select(ctx context.Context, txs [][]byte) sdkmempool.Iterator {
return cm.index.Select(ctx, txs)
}
// CountTx returns the number of transactions in the mempool.
func (cm *ConstructorMempool[C]) CountTx() int {
return cm.index.CountTx()
}
// Contains returns true if the transaction is contained in the mempool.
func (cm *ConstructorMempool[C]) Contains(tx sdk.Tx) bool {
_, txHashStr, err := utils.GetTxHashStr(cm.txEncoder, tx)
if err != nil {
return false
}
_, ok := cm.txCache[txHashStr]
return ok
}
// Compare determines the relative priority of two transactions belonging in the same lane.
func (cm *ConstructorMempool[C]) Compare(ctx sdk.Context, this sdk.Tx, other sdk.Tx) int {
firstPriority := cm.txPriority.GetTxPriority(ctx, this)
secondPriority := cm.txPriority.GetTxPriority(ctx, other)
return cm.txPriority.Compare(firstPriority, secondPriority)
}

View File

@ -2,7 +2,6 @@ package terminator
import (
"context"
"fmt"
"cosmossdk.io/log"
"cosmossdk.io/math"
@ -11,6 +10,10 @@ import (
"github.com/skip-mev/pob/blockbuster"
)
const (
LaneName = "Terminator"
)
// Terminator Lane will get added to the chain to simplify chaining code so that we
// don't need to check if next == nil further up the chain
//
@ -40,26 +43,42 @@ func (t Terminator) PrepareLane(_ sdk.Context, proposal blockbuster.BlockProposa
return proposal, nil
}
// ValidateLaneBasic is a no-op
func (t Terminator) CheckOrder(sdk.Context, []sdk.Tx) error {
return nil
}
// ProcessLane is a no-op
func (t Terminator) ProcessLane(ctx sdk.Context, _ []sdk.Tx, _ blockbuster.ProcessLanesHandler) (sdk.Context, error) {
return ctx, nil
}
// GetMaxBlockSpace is a no-op
func (t Terminator) GetMaxBlockSpace() math.LegacyDec {
return math.LegacyZeroDec()
}
// Logger is a no-op
func (t Terminator) Logger() log.Logger {
return log.NewNopLogger()
}
// Name returns the name of the lane
func (t Terminator) Name() string {
return "Terminator"
return LaneName
}
// SetAnteHandler is a no-op
func (t Terminator) SetAnteHandler(sdk.AnteHandler) {}
// SetIgnoreList is a no-op
func (t Terminator) SetIgnoreList([]blockbuster.Lane) {}
// Match is a no-op
func (t Terminator) Match(sdk.Context, sdk.Tx) bool {
return false
}
// VerifyTx is a no-op
func (t Terminator) VerifyTx(sdk.Context, sdk.Tx) error {
return fmt.Errorf("Terminator lane should not be called")
}
// Contains is a no-op
func (t Terminator) Contains(sdk.Tx) bool {
return false
@ -85,20 +104,7 @@ func (t Terminator) Select(context.Context, [][]byte) sdkmempool.Iterator {
return nil
}
// ValidateLaneBasic is a no-op
func (t Terminator) ProcessLaneBasic(sdk.Context, []sdk.Tx) error {
return nil
}
// SetLaneConfig is a no-op
func (t Terminator) SetAnteHandler(sdk.AnteHandler) {}
// Logger is a no-op
func (t Terminator) Logger() log.Logger {
return log.NewNopLogger()
}
// GetMaxBlockSpace is a no-op
func (t Terminator) GetMaxBlockSpace() math.LegacyDec {
return math.LegacyZeroDec()
// HasHigherPriority is a no-op
func (t Terminator) Compare(sdk.Context, sdk.Tx, sdk.Tx) int {
return 0
}

View File

@ -313,14 +313,16 @@ func (i *PriorityNonceIterator[C]) Next() sdkmempool.Iterator {
// We've reached a transaction with a priority lower than the next highest
// priority in the pool.
if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 {
return i.iteratePriority()
} else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 {
// Weight is incorporated into the priority index key only (not sender index)
// so we must fetch it here from the scores map.
weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight
if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 {
if i.priorityNode.Next() != nil {
if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) < 0 {
return i.iteratePriority()
} else if i.mempool.cfg.TxPriority.Compare(key.priority, i.nextPriority) == 0 {
// Weight is incorporated into the priority index key only (not sender index)
// so we must fetch it here from the scores map.
weight := i.mempool.scores[txMeta[C]{nonce: key.nonce, sender: key.sender}].weight
if i.mempool.cfg.TxPriority.Compare(weight, i.priorityNode.Next().Key().(txMeta[C]).weight) < 0 {
return i.iteratePriority()
}
}
}

View File

@ -131,14 +131,6 @@ func (p *Proposal) UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte
}
p.totalTxBytes = updatedSize
lane.Logger().Info(
"adding transactions to proposal",
"lane", lane.Name(),
"num_txs", len(partialProposalTxs),
"total_tx_bytes", partialProposalSize,
"cumulative_size", updatedSize,
)
p.txs = append(p.txs, partialProposalTxs...)
for _, tx := range partialProposalTxs {
@ -146,8 +138,23 @@ func (p *Proposal) UpdateProposal(lane LaneProposal, partialProposalTxs [][]byte
txHashStr := hex.EncodeToString(txHash[:])
p.cache[txHashStr] = struct{}{}
lane.Logger().Info(
"added transaction to proposal",
"lane", lane.Name(),
"tx_hash", txHashStr,
"tx_bytes", len(tx),
)
}
lane.Logger().Info(
"added transactions to proposal",
"lane", lane.Name(),
"num_txs", len(partialProposalTxs),
"total_tx_bytes", partialProposalSize,
"cumulative_size", updatedSize,
)
return nil
}

View File

@ -1,6 +1,8 @@
package abci
import (
"fmt"
"cosmossdk.io/log"
abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"
@ -20,13 +22,14 @@ type (
}
)
// NewProposalHandler returns a new abci++ proposal handler.
func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, mempool blockbuster.Mempool) *ProposalHandler {
// NewProposalHandler returns a new abci++ proposal handler. This proposal handler will
// iteratively call each of the lanes in the chain to prepare and process a proposal.
func NewProposalHandler(logger log.Logger, txDecoder sdk.TxDecoder, lanes []blockbuster.Lane) *ProposalHandler {
return &ProposalHandler{
logger: logger,
txDecoder: txDecoder,
prepareLanesHandler: ChainPrepareLanes(mempool.Registry()...),
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
prepareLanesHandler: ChainPrepareLanes(lanes...),
processLanesHandler: ChainProcessLanes(lanes...),
}
}
@ -55,6 +58,7 @@ func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
"prepared proposal",
"num_txs", proposal.GetNumTxs(),
"total_tx_bytes", proposal.GetTotalTxBytes(),
"height", req.Height,
)
return &abci.ResponsePrepareProposal{
@ -71,9 +75,11 @@ func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
// In the case where any of the lanes panic, we recover here and return a reject status.
defer func() {
if err := recover(); err != nil {
h.logger.Error("failed to process proposal", "err", err)
if rec := recover(); rec != nil {
h.logger.Error("failed to process proposal", "recover_err", rec)
resp = &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
err = fmt.Errorf("failed to process proposal: %v", rec)
}
}()
@ -125,9 +131,14 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle
// Cache the context in the case where any of the lanes fail to prepare the proposal.
cacheCtx, write := ctx.CacheContext()
// We utilize a recover to handle any panics or errors that occur during the preparation
// of a lane's transactions. This defer will first check if there was a panic or error
// thrown from the lane's preparation logic. If there was, we log the error, skip the lane,
// and call the next lane in the chain to the prepare the proposal.
defer func() {
if rec := recover(); rec != nil || err != nil {
lane.Logger().Error("failed to prepare lane", "lane", lane.Name(), "err", err, "recover_error", rec)
lane.Logger().Info("skipping lane", "lane", lane.Name())
lanesRemaining := len(chain)
switch {
@ -141,22 +152,13 @@ func ChainPrepareLanes(chain ...blockbuster.Lane) blockbuster.PrepareLanesHandle
// is the lane that failed to prepare the proposal but the second lane in the
// chain is not the terminator lane so there could potentially be more transactions
// added to the proposal
maxTxBytesForLane := utils.GetMaxTxBytesForLane(
partialProposal.GetMaxTxBytes(),
partialProposal.GetTotalTxBytes(),
chain[1].GetMaxBlockSpace(),
)
finalProposal, err = chain[1].PrepareLane(
ctx,
partialProposal,
maxTxBytesForLane,
ChainPrepareLanes(chain[2:]...),
)
finalProposal, err = ChainPrepareLanes(chain[1:]...)(ctx, partialProposal)
}
} else {
// Write the cache to the context since we know that the lane successfully prepared
// the partial proposal.
// the partial proposal. State is written to in a backwards, cascading fashion. This means
// that the final context will only be updated after all other lanes have successfully
// prepared the partial proposal.
write()
}
}()
@ -198,7 +200,7 @@ func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandle
chain[0].Logger().Info("processing lane", "lane", chain[0].Name())
if err := chain[0].ProcessLaneBasic(ctx, proposalTxs); err != nil {
if err := chain[0].CheckOrder(ctx, proposalTxs); err != nil {
chain[0].Logger().Error("failed to process lane", "lane", chain[0].Name(), "err", err)
return ctx, err
}

View File

@ -1,4 +1,4 @@
package abci_test
package proposals_test
import (
"math/rand"

View File

@ -1,4 +1,4 @@
package abci
package proposals
import (
"context"

164
blockbuster/types.go Normal file
View File

@ -0,0 +1,164 @@
package blockbuster
import (
"fmt"
"cosmossdk.io/log"
"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
)
type (
// MatchHandler is utilized to determine if a transaction should be included in the lane. This
// function can be a stateless or stateful check on the transaction.
MatchHandler func(ctx sdk.Context, tx sdk.Tx) bool
// PrepareLaneHandler is responsible for preparing transactions to be included in the block from a
// given lane. Given a lane, this function should return the transactions to include in the block,
// the transactions that must be removed from the lane, and an error if one occurred.
PrepareLaneHandler func(
ctx sdk.Context,
proposal BlockProposal,
maxTxBytes int64,
) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error)
// ProcessLaneHandler is responsible for processing transactions that are included in a block and
// belong to a given lane. ProcessLaneHandler is executed after CheckOrderHandler so the transactions
// passed into this function SHOULD already be in order respecting the ordering rules of the lane and
// respecting the ordering rules of mempool relative to the lanes it has.
ProcessLaneHandler func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error)
// CheckOrderHandler is responsible for checking the order of transactions that belong to a given
// lane. This handler should be used to verify that the ordering of transactions passed into the
// function respect the ordering logic of the lane (if any transactions from the lane are included).
// This function should also ensure that transactions that belong to this lane are contiguous and do
// not have any transactions from other lanes in between them.
CheckOrderHandler func(ctx sdk.Context, txs []sdk.Tx) error
// PrepareLanesHandler wraps all of the lanes' PrepareLane function into a single chained
// function. You can think of it like an AnteHandler, but for preparing proposals in the
// context of lanes instead of modules.
PrepareLanesHandler func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error)
// ProcessLanesHandler wraps all of the lanes' ProcessLane functions into a single chained
// function. You can think of it like an AnteHandler, but for processing proposals in the
// context of lanes instead of modules.
ProcessLanesHandler func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error)
// LaneConfig defines the basic functionality needed for a lane.
LaneConfig struct {
Logger log.Logger
TxEncoder sdk.TxEncoder
TxDecoder sdk.TxDecoder
AnteHandler sdk.AnteHandler
// MaxBlockSpace defines the relative percentage of block space that can be
// used by this lane. NOTE: If this is set to zero, then there is no limit
// on the number of transactions that can be included in the block for this
// lane (up to maxTxBytes as provided by the request). This is useful for the default lane.
MaxBlockSpace math.LegacyDec
// IgnoreList defines the list of lanes to ignore when processing transactions. This
// is useful for when you want lanes to exist after the default lane. For example,
// say there are two lanes: default and free. The free lane should be processed after
// the default lane. In this case, the free lane should be added to the ignore list
// of the default lane. Otherwise, the transactions that belong to the free lane
// will be processed by the default lane (which accepts all transactions by default).
IgnoreList []Lane
// MaxTxs sets the maximum number of transactions allowed in the mempool with
// the semantics:
// - if MaxTx == 0, there is no cap on the number of transactions in the mempool
// - if MaxTx > 0, the mempool will cap the number of transactions it stores,
// and will prioritize transactions by their priority and sender-nonce
// (sequence number) when evicting transactions.
// - if MaxTx < 0, `Insert` is a no-op.
MaxTxs int
}
)
// NewLaneConfig returns a new LaneConfig. This will be embedded in a lane.
func NewBaseLaneConfig(
logger log.Logger,
txEncoder sdk.TxEncoder,
txDecoder sdk.TxDecoder,
anteHandler sdk.AnteHandler,
maxBlockSpace math.LegacyDec,
) LaneConfig {
return LaneConfig{
Logger: logger,
TxEncoder: txEncoder,
TxDecoder: txDecoder,
AnteHandler: anteHandler,
MaxBlockSpace: maxBlockSpace,
}
}
// ValidateBasic validates the lane configuration.
func (c *LaneConfig) ValidateBasic() error {
if c.Logger == nil {
return fmt.Errorf("logger cannot be nil")
}
if c.TxEncoder == nil {
return fmt.Errorf("tx encoder cannot be nil")
}
if c.TxDecoder == nil {
return fmt.Errorf("tx decoder cannot be nil")
}
if c.MaxBlockSpace.IsNil() || c.MaxBlockSpace.IsNegative() || c.MaxBlockSpace.GT(math.LegacyOneDec()) {
return fmt.Errorf("max block space must be set to a value between 0 and 1")
}
return nil
}
// NoOpPrepareLanesHandler returns a no-op prepare lanes handler.
// This should only be used for testing.
func NoOpPrepareLanesHandler() PrepareLanesHandler {
return func(ctx sdk.Context, proposal BlockProposal) (BlockProposal, error) {
return proposal, nil
}
}
// NoOpPrepareLaneHandler returns a no-op prepare lane handler.
// This should only be used for testing.
func NoOpPrepareLaneHandler() PrepareLaneHandler {
return func(ctx sdk.Context, proposal BlockProposal, maxTxBytes int64) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error) {
return nil, nil, nil
}
}
// PanicPrepareLaneHandler returns a prepare lane handler that panics.
// This should only be used for testing.
func PanicPrepareLaneHandler() PrepareLaneHandler {
return func(sdk.Context, BlockProposal, int64) (txsToInclude [][]byte, txsToRemove []sdk.Tx, err error) {
panic("panic prepare lanes handler")
}
}
// NoOpProcessLanesHandler returns a no-op process lanes handler.
// This should only be used for testing.
func NoOpProcessLanesHandler() ProcessLanesHandler {
return func(ctx sdk.Context, txs []sdk.Tx) (sdk.Context, error) {
return ctx, nil
}
}
// NoOpProcessLaneHandler returns a no-op process lane handler.
// This should only be used for testing.
func NoOpProcessLaneHandler() ProcessLaneHandler {
return func(ctx sdk.Context, txs []sdk.Tx) ([]sdk.Tx, error) {
return txs, nil
}
}
// PanicProcessLanesHandler returns a process lanes handler that panics.
// This should only be used for testing.
func PanicProcessLaneHandler() ProcessLaneHandler {
return func(sdk.Context, []sdk.Tx) ([]sdk.Tx, error) {
panic("panic process lanes handler")
}
}

View File

@ -40,8 +40,8 @@ func GetDecodedTxs(txDecoder sdk.TxDecoder, txs [][]byte) ([]sdk.Tx, error) {
}
// RemoveTxsFromLane removes the transactions from the given lane's mempool.
func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) error {
for tx := range txs {
func RemoveTxsFromLane(txs []sdk.Tx, mempool sdkmempool.Mempool) error {
for _, tx := range txs {
if err := mempool.Remove(tx); err != nil {
return err
}