feat(BB): Creating a chain decorator for process lanes (#162)

This commit is contained in:
David Terpay 2023-06-05 12:18:06 -04:00 committed by GitHub
parent 2a0af8d67b
commit 1b928d883a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 339 additions and 102 deletions

View File

@ -10,7 +10,6 @@ run:
linters:
disable-all: true
enable:
- depguard
- dogsled
- exportloopref
- goconst

View File

@ -1,70 +0,0 @@
package blockbuster
import (
"crypto/sha256"
"encoding/hex"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
)
type ProposalHandler struct {
logger log.Logger
mempool Mempool
}
func NewProposalHandler(logger log.Logger, mempool Mempool) *ProposalHandler {
return &ProposalHandler{
logger: logger,
mempool: mempool,
}
}
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
var (
selectedTxs = make(map[string][]byte)
totalTxBytes int64
)
for _, l := range h.mempool.registry {
if totalTxBytes < req.MaxTxBytes {
laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs)
if err != nil {
h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err)
continue
}
for _, txBz := range laneTxs {
totalTxBytes += int64(len(txBz))
txHash := sha256.Sum256(txBz)
txHashStr := hex.EncodeToString(txHash[:])
selectedTxs[txHashStr] = txBz
}
}
}
proposalTxs := make([][]byte, 0, len(selectedTxs))
for _, txBz := range selectedTxs {
proposalTxs = append(proposalTxs, txBz)
}
return abci.ResponsePrepareProposal{Txs: proposalTxs}
}
}
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal {
for _, l := range h.mempool.registry {
if err := l.ProcessLane(ctx, req.Txs); err != nil {
h.logger.Error("failed to process lane", "lane", l.Name(), "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}

101
blockbuster/abci/abci.go Normal file
View File

@ -0,0 +1,101 @@
package abci
import (
"crypto/sha256"
"encoding/hex"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/skip-mev/pob/blockbuster"
"github.com/skip-mev/pob/blockbuster/lanes/terminator"
)
type (
// ProposalHandler is a wrapper around the ABCI++ PrepareProposal and ProcessProposal
// handlers.
ProposalHandler struct {
logger log.Logger
mempool blockbuster.Mempool
txEncoder sdk.TxEncoder
processLanesHandler blockbuster.ProcessLanesHandler
}
)
// NewProposalHandler returns a new ProposalHandler.
func NewProposalHandler(logger log.Logger, mempool blockbuster.Mempool, txEncoder sdk.TxEncoder) *ProposalHandler {
return &ProposalHandler{
logger: logger,
mempool: mempool,
txEncoder: txEncoder,
processLanesHandler: ChainProcessLanes(mempool.Registry()...),
}
}
// ChainProcessLane chains together the proposal verification logic from each lane
// into a single function. The first lane in the chain is the first lane to be verified and
// the last lane in the chain is the last lane to be verified.
func ChainProcessLanes(chain ...blockbuster.Lane) blockbuster.ProcessLanesHandler {
if len(chain) == 0 {
return nil
}
// Handle non-terminated decorators chain
if (chain[len(chain)-1] != terminator.Terminator{}) {
chain = append(chain, terminator.Terminator{})
}
return func(ctx sdk.Context, proposalTxs [][]byte) (sdk.Context, error) {
return chain[0].ProcessLane(ctx, proposalTxs, ChainProcessLanes(chain[1:]...))
}
}
func (h *ProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler {
return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
var (
selectedTxs = make(map[string][]byte)
totalTxBytes int64
)
for _, l := range h.mempool.Registry() {
if totalTxBytes < req.MaxTxBytes {
laneTxs, err := l.PrepareLane(ctx, req.MaxTxBytes, selectedTxs)
if err != nil {
h.logger.Error("failed to prepare lane; skipping", "lane", l.Name(), "err", err)
continue
}
for _, txBz := range laneTxs {
totalTxBytes += int64(len(txBz))
txHash := sha256.Sum256(txBz)
txHashStr := hex.EncodeToString(txHash[:])
selectedTxs[txHashStr] = txBz
}
}
}
proposalTxs := make([][]byte, 0, len(selectedTxs))
for _, txBz := range selectedTxs {
proposalTxs = append(proposalTxs, txBz)
}
return abci.ResponsePrepareProposal{Txs: proposalTxs}
}
}
// ProcessProposalHandler processes the proposal by verifying all transactions in the proposal
// according to each lane's verification logic. We verify proposals in a greedy fashion.
// If a lane's portion of the proposal is invalid, we reject the proposal. After a lane's portion
// of the proposal is verified, we pass the remaining transactions to the next lane in the chain.
func (h *ProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler {
return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal {
if _, err := h.processLanesHandler(ctx, req.Txs); err != nil {
h.logger.Error("failed to process lanes", "err", err)
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}
}
}

View File

@ -7,6 +7,12 @@ import (
)
type (
// ProcessLanesHandler wraps all of the lanes Process functions into a single chained
// function. You can think of it like an AnteHandler, but for processing proposals in the
// context of lanes instead of modules.
ProcessLanesHandler func(ctx sdk.Context, proposalTxs [][]byte) (sdk.Context, error)
// BaseLaneConfig defines the basic functionality needed for a lane.
BaseLaneConfig struct {
Logger log.Logger
@ -37,8 +43,8 @@ type (
// the raw transaction.
PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error)
// ProcessLane which verifies the lane's portion of a proposed block.
ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error
// ProcessLane verifies this lane's portion of a proposed block.
ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next ProcessLanesHandler) (sdk.Context, error)
}
)

View File

@ -73,6 +73,17 @@ selectBidTxLoop:
continue selectBidTxLoop
}
hash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, sdkTx)
if err != nil {
txsToRemove[tmpBidTx] = struct{}{}
continue selectBidTxLoop
}
// if the transaction is already in the (partial) block proposal, we skip it.
if _, ok := selectedTxs[hash]; ok {
continue selectBidTxLoop
}
bundleTxBz := make([]byte, len(sdkTxBz))
copy(bundleTxBz, sdkTxBz)
bundledTxBz[index] = sdkTxBz
@ -115,16 +126,19 @@ selectBidTxLoop:
// block proposal is invalid. The block proposal is invalid if it does not
// respect the ordering of transactions in the bid transaction or if the bid/bundled
// transactions are invalid.
func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error {
func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next blockbuster.ProcessLanesHandler) (sdk.Context, error) {
// Track the index of the first transaction that does not belong to this lane.
endIndex := 0
for index, txBz := range proposalTxs {
tx, err := l.cfg.TxDecoder(txBz)
if err != nil {
return err
return ctx, err
}
bidInfo, err := l.GetAuctionBidInfo(tx)
if err != nil {
return err
return ctx, fmt.Errorf("failed to get auction bid info for tx %w", err)
}
// If the transaction is an auction bid, then we need to ensure that it is
@ -133,12 +147,12 @@ func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error {
// the bid.
if bidInfo != nil {
if index != 0 {
return errors.New("auction bid must be the first transaction in the block proposal")
return ctx, fmt.Errorf("block proposal did not place auction bid transaction at the top of the lane: %d", index)
}
bundledTransactions := bidInfo.Transactions
if len(proposalTxs) < len(bundledTransactions)+1 {
return errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid")
return ctx, errors.New("block proposal does not contain enough transactions to match the bundled transactions in the auction bid")
}
for i, refTxRaw := range bundledTransactions {
@ -146,27 +160,29 @@ func (l *TOBLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte) error {
// reference transaction can be processed as an sdk.Tx.
wrappedTx, err := l.WrapBundleTransaction(refTxRaw)
if err != nil {
return err
return ctx, err
}
refTxBz, err := l.cfg.TxEncoder(wrappedTx)
if err != nil {
return err
return ctx, err
}
if !bytes.Equal(refTxBz, proposalTxs[i+1]) {
return errors.New("block proposal does not match the bundled transactions in the auction bid")
return ctx, errors.New("block proposal does not match the bundled transactions in the auction bid")
}
}
// Verify the bid transaction.
if err = l.VerifyTx(ctx, tx); err != nil {
return err
return ctx, err
}
endIndex += len(bundledTransactions) + 1
}
}
return nil
return next(ctx, proposalTxs[endIndex:])
}
// VerifyTx will verify that the bid transaction and all of its bundled

View File

@ -1,15 +1,90 @@
package base
import sdk "github.com/cosmos/cosmos-sdk/types"
import (
"fmt"
func (l *DefaultLane) PrepareLane(sdk.Context, int64, map[string][]byte) ([][]byte, error) {
panic("implement me")
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/skip-mev/pob/blockbuster"
)
// PrepareLane will prepare a partial proposal for the base lane.
func (l *DefaultLane) PrepareLane(ctx sdk.Context, maxTxBytes int64, selectedTxs map[string][]byte) ([][]byte, error) {
txs := make([][]byte, 0)
txsToRemove := make(map[sdk.Tx]struct{}, 0)
totalSize := int64(0)
// Select all transactions in the mempool that are valid and not already in the
// partial proposal.
for iterator := l.Mempool.Select(ctx, nil); iterator != nil; iterator = iterator.Next() {
tx := iterator.Tx()
txBytes, err := l.cfg.TxEncoder(tx)
if err != nil {
txsToRemove[tx] = struct{}{}
continue
}
// if the transaction is already in the (partial) block proposal, we skip it.
hash, err := blockbuster.GetTxHashStr(l.cfg.TxEncoder, tx)
if err != nil {
txsToRemove[tx] = struct{}{}
continue
}
if _, ok := selectedTxs[hash]; ok {
continue
}
// If the transaction is too large, we skip it.
txSize := int64(len(txBytes))
if updatedSize := totalSize + txSize; updatedSize > maxTxBytes {
break
}
// Verify the transaction.
if err := l.VerifyTx(ctx, tx); err != nil {
txsToRemove[tx] = struct{}{}
continue
}
totalSize += txSize
txs = append(txs, txBytes)
}
// Remove all transactions that were invalid during the creation of the partial proposal.
if err := blockbuster.RemoveTxsFromLane(txsToRemove, l.Mempool); err != nil {
return nil, fmt.Errorf("failed to remove txs from mempool for lane %s: %w", l.Name(), err)
}
return txs, nil
}
func (l *DefaultLane) ProcessLane(sdk.Context, [][]byte) error {
panic("implement me")
// ProcessLane verifies the default lane's portion of a block proposal.
func (l *DefaultLane) ProcessLane(ctx sdk.Context, proposalTxs [][]byte, next blockbuster.ProcessLanesHandler) (sdk.Context, error) {
for index, tx := range proposalTxs {
tx, err := l.cfg.TxDecoder(tx)
if err != nil {
return ctx, fmt.Errorf("failed to decode tx: %w", err)
}
if l.Match(tx) {
if err := l.VerifyTx(ctx, tx); err != nil {
return ctx, fmt.Errorf("failed to verify tx: %w", err)
}
} else {
return next(ctx, proposalTxs[index:])
}
}
// This means we have processed all transactions in the proposal.
return ctx, nil
}
func (l *DefaultLane) VerifyTx(sdk.Context, sdk.Tx) error {
panic("implement me")
// VerifyTx does basic verification of the transaction using the ante handler.
func (l *DefaultLane) VerifyTx(ctx sdk.Context, tx sdk.Tx) error {
if l.cfg.AnteHandler != nil {
_, err := l.cfg.AnteHandler(ctx, tx, false)
return err
}
return nil
}

View File

@ -0,0 +1,84 @@
package terminator
import (
"context"
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
"github.com/skip-mev/pob/blockbuster"
)
// Terminator Lane will get added to the chain to simplify chaining code so that we
// don't need to check if next == nil further up the chain
//
// sniped from the sdk
//
// ______
// <((((((\\\
// / . }\
// ;--..--._|}
// (\ '--/\--' )
// \\ | '-' :'|
// \\ . -==- .-|
// \\ \.__.' \--._
// [\\ __.--| // _/'--.
// \ \\ .'-._ ('-----'/ __/ \
// \ \\ / __>| | '--. |
// \ \\ | \ | / / /
// \ '\ / \ | | _/ /
// \ \ \ | | / /
// snd \ \ \ /
type Terminator struct{}
var _ blockbuster.Lane = (*Terminator)(nil)
// PrepareLane is a no-op
func (t Terminator) PrepareLane(_ sdk.Context, _ int64, _ map[string][]byte) ([][]byte, error) {
return nil, nil
}
// ProcessLane is a no-op
func (t Terminator) ProcessLane(ctx sdk.Context, _ [][]byte, _ blockbuster.ProcessLanesHandler) (sdk.Context, error) {
return ctx, nil
}
// Name returns the name of the lane
func (t Terminator) Name() string {
return "Terminator"
}
// Match is a no-op
func (t Terminator) Match(sdk.Tx) bool {
return false
}
// VerifyTx is a no-op
func (t Terminator) VerifyTx(sdk.Context, sdk.Tx) error {
return fmt.Errorf("Terminator lane should not be called")
}
// Contains is a no-op
func (t Terminator) Contains(sdk.Tx) (bool, error) {
return false, nil
}
// CountTx is a no-op
func (t Terminator) CountTx() int {
return 0
}
// Insert is a no-op
func (t Terminator) Insert(context.Context, sdk.Tx) error {
return nil
}
// Remove is a no-op
func (t Terminator) Remove(sdk.Tx) error {
return nil
}
// Select is a no-op
func (t Terminator) Select(context.Context, [][]byte) sdkmempool.Iterator {
return nil
}

View File

@ -8,23 +8,32 @@ import (
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
var _ sdkmempool.Mempool = (*Mempool)(nil)
var _ Mempool = (*BBMempool)(nil)
// Mempool defines the Blockbuster mempool implement. It contains a registry
// of lanes, which allows for customizable block proposal construction.
type Mempool struct {
registry []Lane
}
type (
Mempool interface {
sdkmempool.Mempool
func NewMempool(lanes ...Lane) *Mempool {
return &Mempool{
// Registry returns the mempool's lane registry.
Registry() []Lane
}
// Mempool defines the Blockbuster mempool implement. It contains a registry
// of lanes, which allows for customizable block proposal construction.
BBMempool struct {
registry []Lane
}
)
func NewMempool(lanes ...Lane) *BBMempool {
return &BBMempool{
registry: lanes,
}
}
// TODO: Consider using a tx cache in Mempool and returning the length of that
// cache instead of relying on lane count tracking.
func (m *Mempool) CountTx() int {
func (m *BBMempool) CountTx() int {
var total int
for _, lane := range m.registry {
// TODO: If a global lane exists, we assume that lane has all transactions
@ -42,7 +51,7 @@ func (m *Mempool) CountTx() int {
// Insert inserts a transaction into every lane that it matches. Insertion will
// be attempted on all lanes, even if an error is encountered.
func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error {
func (m *BBMempool) Insert(ctx context.Context, tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
@ -61,13 +70,13 @@ func (m *Mempool) Insert(ctx context.Context, tx sdk.Tx) error {
// - Determine if it even makes sense to return an iterator. What does that even
// mean in the context where you have multiple lanes?
// - Perhaps consider implementing and returning a no-op iterator?
func (m *Mempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator {
func (m *BBMempool) Select(_ context.Context, _ [][]byte) sdkmempool.Iterator {
return nil
}
// Remove removes a transaction from every lane that it matches. Removal will be
// attempted on all lanes, even if an error is encountered.
func (m *Mempool) Remove(tx sdk.Tx) error {
func (m *BBMempool) Remove(tx sdk.Tx) error {
errs := make([]error, 0, len(m.registry))
for _, lane := range m.registry {
@ -79,3 +88,8 @@ func (m *Mempool) Remove(tx sdk.Tx) error {
return errors.Join(errs...)
}
// Registry returns the mempool's lane registry.
func (m *BBMempool) Registry() []Lane {
return m.registry
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
// GetTxHashStr returns the hex-encoded hash of the transaction.
@ -20,3 +21,14 @@ func GetTxHashStr(txEncoder sdk.TxEncoder, tx sdk.Tx) (string, error) {
return txHashStr, nil
}
// RemoveTxsFromLane removes the transactions from the given lane's mempool.
func RemoveTxsFromLane(txs map[sdk.Tx]struct{}, mempool sdkmempool.Mempool) error {
for tx := range txs {
if err := mempool.Remove(tx); err != nil {
return err
}
}
return nil
}