refactor(server/v2): Update prepare & process proposal (#21237)

This commit is contained in:
Hieu Vu 2024-09-04 14:58:07 +07:00 committed by GitHub
parent 3bb9528e59
commit 95b8092cbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 57 additions and 52 deletions

View File

@ -143,7 +143,8 @@ func (a AppManager[T]) ValidateTx(ctx context.Context, tx T) (server.TxResult, e
if err != nil {
return server.TxResult{}, err
}
return a.stf.ValidateTx(ctx, latestState, a.config.ValidateTxGasLimit, tx), nil
res := a.stf.ValidateTx(ctx, latestState, a.config.ValidateTxGasLimit, tx)
return res, res.Error
}
// Simulate runs validation and execution flow of a Tx.

View File

@ -325,17 +325,8 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("PrepareProposal called with invalid height")
}
decodedTxs := make([]T, len(req.Txs))
for i, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs[i] = decTx
if c.prepareProposalHandler == nil {
return nil, errors.New("no prepare proposal function was set")
}
ciCtx := contextWithCometInfo(ctx, comet.Info{
@ -345,7 +336,7 @@ func (c *Consensus[T]) PrepareProposal(
LastCommit: toCoreExtendedCommitInfo(req.LocalLastCommit),
})
txs, err := c.prepareProposalHandler(ciCtx, c.app, decodedTxs, req)
txs, err := c.prepareProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
return nil, err
}
@ -366,16 +357,12 @@ func (c *Consensus[T]) ProcessProposal(
ctx context.Context,
req *abciproto.ProcessProposalRequest,
) (*abciproto.ProcessProposalResponse, error) {
decodedTxs := make([]T, len(req.Txs))
for _, tx := range req.Txs {
decTx, err := c.txCodec.Decode(tx)
if err != nil {
// TODO: vote extension meta data as a custom type to avoid possibly accepting invalid txs
// continue even if tx decoding fails
c.logger.Error("failed to decode tx", "err", err)
continue
}
decodedTxs = append(decodedTxs, decTx)
if req.Height < 1 {
return nil, errors.New("ProcessProposal called with invalid height")
}
if c.processProposalHandler == nil {
return nil, errors.New("no process proposal function was set")
}
ciCtx := contextWithCometInfo(ctx, comet.Info{
@ -385,7 +372,7 @@ func (c *Consensus[T]) ProcessProposal(
LastCommit: toCoreCommitInfo(req.ProposedLastCommit),
})
err := c.processProposalHandler(ciCtx, c.app, decodedTxs, req)
err := c.processProposalHandler(ciCtx, c.app, c.txCodec, req)
if err != nil {
c.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abciproto.ProcessProposalResponse{

View File

@ -6,13 +6,12 @@ import (
"fmt"
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"
consensusv1 "cosmossdk.io/api/cosmos/consensus/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/cometbft/mempool"
consensustypes "cosmossdk.io/x/consensus/types"
)
type AppManager[T transaction.Tx] interface {
@ -33,28 +32,25 @@ func NewDefaultProposalHandler[T transaction.Tx](mp mempool.Mempool[T]) *Default
}
func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
abciReq, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return nil, fmt.Errorf("expected abci.PrepareProposalRequest, invalid request type: %T,", req)
}
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
var maxBlockGas uint64
res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return nil, err
}
paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return nil, fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}
if b := paramsResp.GetParams().Block; b != nil {
maxBlockGas = uint64(b.MaxGas)
}
txs := decodeTxs(codec, req.Txs)
defer h.txSelector.Clear()
// If the mempool is nil or NoOp we simply return the transactions
@ -64,7 +60,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
if h.mempool == nil || isNoOp {
for _, tx := range txs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, tx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx)
if stop {
break
}
@ -88,7 +84,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
return nil, err
}
} else {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(abciReq.MaxTxBytes), maxBlockGas, memTx)
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx)
if stop {
break
}
@ -102,7 +98,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
}
func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) error {
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.ProcessProposalRequest) error {
// If the mempool is nil we simply return ACCEPT,
// because PrepareProposal may have included txs that could fail verification.
_, isNoOp := h.mempool.(mempool.NoOpMempool[T])
@ -110,19 +106,14 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
return nil
}
_, ok := req.(*abci.PrepareProposalRequest)
if !ok {
return fmt.Errorf("invalid request type: %T", req)
}
res, err := app.Query(ctx, 0, &consensusv1.QueryParamsRequest{})
res, err := app.Query(ctx, 0, &consensustypes.QueryParamsRequest{})
if err != nil {
return err
}
paramsResp, ok := res.(*consensusv1.QueryParamsResponse)
paramsResp, ok := res.(*consensustypes.QueryParamsResponse)
if !ok {
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensusv1.QueryParamsResponse{}, res)
return fmt.Errorf("unexpected consensus params response type; expected: %T, got: %T", &consensustypes.QueryParamsResponse{}, res)
}
var maxBlockGas uint64
@ -130,6 +121,17 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
maxBlockGas = uint64(b.MaxGas)
}
// Decode request txs bytes
// If there an tx decoded fail, return err
var txs []T
for _, tx := range req.Txs {
decTx, err := codec.Decode(tx)
if err != nil {
return fmt.Errorf("failed to decode tx: %w", err)
}
txs = append(txs, decTx)
}
var totalTxGas uint64
for _, tx := range txs {
_, err := app.ValidateTx(ctx, tx)
@ -153,18 +155,34 @@ func (h *DefaultProposalHandler[T]) ProcessHandler() ProcessHandler[T] {
}
}
// decodeTxs decodes the txs bytes into a decoded txs
// If there a fail decoding tx, remove from the list
// Used for prepare proposal
func decodeTxs[T transaction.Tx](codec transaction.Codec[T], txsBz [][]byte) []T {
var txs []T
for _, tx := range txsBz {
decTx, err := codec.Decode(tx)
if err != nil {
continue
}
txs = append(txs, decTx)
}
return txs
}
// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always
// return the transactions sent by the client's request.
func NoOpPrepareProposal[T transaction.Tx]() PrepareHandler[T] {
return func(ctx context.Context, app AppManager[T], txs []T, req proto.Message) ([]T, error) {
return txs, nil
return func(ctx context.Context, app AppManager[T], codec transaction.Codec[T], req *abci.PrepareProposalRequest) ([]T, error) {
return decodeTxs(codec, req.Txs), nil
}
}
// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always
// return ACCEPT.
func NoOpProcessProposal[T transaction.Tx]() ProcessHandler[T] {
return func(context.Context, AppManager[T], []T, proto.Message) error {
return func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error {
return nil
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/cosmos/gogoproto/proto"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
@ -13,11 +12,11 @@ import (
type (
// PrepareHandler passes in the list of Txs that are being proposed. The app can then do stateful operations
// over the list of proposed transactions. It can return a modified list of txs to include in the proposal.
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) ([]T, error)
PrepareHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.PrepareProposalRequest) ([]T, error)
// ProcessHandler is a function that takes a list of transactions and returns a boolean and an error.
// If the verification of a transaction fails, the boolean is false and the error is non-nil.
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], []T, proto.Message) error
ProcessHandler[T transaction.Tx] func(context.Context, AppManager[T], transaction.Codec[T], *abci.ProcessProposalRequest) error
// VerifyVoteExtensionhandler is a function type that handles the verification of a vote extension request.
// It takes a context, a store reader map, and a request to verify a vote extension.