feat(server/v2/cometbft): optimistic execution (#22560)

Co-authored-by: Randy Grok <@faulttolerance.net>
This commit is contained in:
Randy Grok 2024-11-28 13:29:35 +01:00 committed by GitHub
parent ca48cef255
commit 9caec0658d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 418 additions and 48 deletions

View File

@ -11,7 +11,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"cosmossdk.io/collections"
@ -28,6 +28,7 @@ import (
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/handlers"
"cosmossdk.io/server/v2/cometbft/mempool"
"cosmossdk.io/server/v2/cometbft/oe"
"cosmossdk.io/server/v2/cometbft/types"
cometerrors "cosmossdk.io/server/v2/cometbft/types/errors"
"cosmossdk.io/server/v2/streaming"
@ -77,6 +78,11 @@ type consensus[T transaction.Tx] struct {
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]
// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution[T]
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID
@ -385,6 +391,14 @@ func (c *consensus[T]) PrepareProposal(
return nil, errors.New("no prepare proposal function was set")
}
// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()
ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
@ -421,6 +435,16 @@ func (c *consensus[T]) ProcessProposal(
return nil, errors.New("no process proposal function was set")
}
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
}
ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
@ -436,6 +460,17 @@ func (c *consensus[T]) ProcessProposal(
}, nil
}
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}
return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
@ -447,46 +482,40 @@ func (c *consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
var (
resp *server.BlockResponse
newState store.WriterMap
decodedTxs []T
err error
)
if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, optimistErr := c.optimisticExec.WaitResult()
if !aborted {
if res != nil {
resp = res.Resp
newState = res.StateChanges
decodedTxs = res.DecodedTxs
}
if optimistErr != nil {
return nil, optimistErr
}
}
c.optimisticExec.Reset()
}
if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, err
}
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, err
}
cid, err := c.store.LastCommitID()
if err != nil {
return nil, err
}
blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}
ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})
resp, newState, err := c.app.DeliverBlock(ciCtx, blockReq)
if err != nil {
return nil, err
if resp == nil { // if we didn't run OE, run the normal finalize block
resp, newState, decodedTxs, err = c.internalFinalizeBlock(ctx, req)
if err != nil {
return nil, err
}
}
// after we get the changeset we can produce the commit hash,
@ -531,6 +560,52 @@ func (c *consensus[T]) FinalizeBlock(
return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
}
func (c *consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*server.BlockResponse, store.WriterMap, []T, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, nil, nil, err
}
if err := c.checkHalt(req.Height, req.Time); err != nil {
return nil, nil, nil, err
}
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
// have a tx that fails decoding.
decodedTxs, err := decodeTxs(req.Txs, c.txCodec)
if err != nil {
return nil, nil, nil, err
}
cid, err := c.store.LastCommitID()
if err != nil {
return nil, nil, nil, err
}
blockReq := &server.BlockRequest[T]{
Height: uint64(req.Height),
Time: req.Time,
Hash: req.Hash,
AppHash: cid.Hash,
ChainId: c.chainID,
Txs: decodedTxs,
}
ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: toCoreCommitInfo(req.DecidedLastCommit),
})
resp, stateChanges, err := c.app.DeliverBlock(ciCtx, blockReq)
return resp, stateChanges, decodedTxs, err
}
// Commit implements types.Application.
// It is called by cometbft to notify the application that a block was committed.
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {

View File

@ -2,14 +2,18 @@ package cometbft
import (
"context"
"cosmossdk.io/core/server"
"crypto/sha256"
"encoding/json"
"errors"
abci "github.com/cometbft/cometbft/abci/types"
"io"
"strings"
"sync"
"testing"
"time"
"cosmossdk.io/server/v2/cometbft/oe"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
v1 "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cosmos/gogoproto/proto"
@ -56,10 +60,10 @@ func getQueryRouterBuilder[T any, PT interface {
*T
proto.Message
},
U any, UT interface {
*U
proto.Message
}](
U any, UT interface {
*U
proto.Message
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
@ -86,10 +90,10 @@ func getMsgRouterBuilder[T any, PT interface {
*T
transaction.Msg
},
U any, UT interface {
*U
transaction.Msg
}](
U any, UT interface {
*U
transaction.Msg
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
@ -514,6 +518,12 @@ func TestConsensus_ProcessProposal(t *testing.T) {
require.Error(t, err)
// NoOp handler
// dummy optimistic execution
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
_, err = c.ProcessProposal(context.Background(), &abciproto.ProcessProposalRequest{
Height: 1,
@ -724,3 +734,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}
func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})
// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler
// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []mock.Tx, error) {
calledTimes++
return nil, nil, nil, errors.New("test error")
}
c.optimisticExec = oe.NewOptimisticExecution[mock.Tx](log.NewNopLogger(), optimisticMockFunc)
_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)
_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)
theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}
// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)
// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)
resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)
theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]
// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)
// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}

View File

@ -0,0 +1,169 @@
package oe
import (
"bytes"
"context"
"encoding/hex"
"math/rand"
"sync"
"time"
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
)
// FinalizeBlockFunc is the function that is called by the OE to finalize the
// block. It is the same as the one in the ABCI app.
type FinalizeBlockFunc[T transaction.Tx] func(context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []T, error)
// OptimisticExecution is a struct that contains the OE context. It is used to
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
type OptimisticExecution[T transaction.Tx] struct {
finalizeBlockFunc FinalizeBlockFunc[T] // ABCI FinalizeBlock function with a context
logger log.Logger
mtx sync.Mutex
stopCh chan struct{}
request *abci.FinalizeBlockRequest
response *FinalizeBlockResponse[T]
err error
cancelFunc func() // cancel function for the context
initialized bool // A boolean value indicating whether the struct has been initialized
// debugging/testing options
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
}
type FinalizeBlockResponse[T transaction.Tx] struct {
Resp *server.BlockResponse
StateChanges store.WriterMap
DecodedTxs []T
}
// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
func NewOptimisticExecution[T transaction.Tx](logger log.Logger, fn FinalizeBlockFunc[T], opts ...func(*OptimisticExecution[T])) *OptimisticExecution[T] {
logger = logger.With(log.ModuleKey, "oe")
oe := &OptimisticExecution[T]{logger: logger, finalizeBlockFunc: fn}
for _, opt := range opts {
opt(oe)
}
return oe
}
// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
// 0 to 100 that determines the percentage of OE that should be aborted.
// This is for testing purposes only and must not be used in production.
func WithAbortRate[T transaction.Tx](rate int) func(*OptimisticExecution[T]) {
return func(oe *OptimisticExecution[T]) {
oe.abortRate = rate
}
}
// Reset resets the OE context. Must be called whenever we want to invalidate
// the current OE.
func (oe *OptimisticExecution[T]) Reset() {
oe.mtx.Lock()
defer oe.mtx.Unlock()
oe.request = nil
oe.response = nil
oe.err = nil
oe.initialized = false
}
// Initialized returns true if the OE was initialized, meaning that it contains
// a request and it was run or it is running.
func (oe *OptimisticExecution[T]) Initialized() bool {
if oe == nil {
return false
}
oe.mtx.Lock()
defer oe.mtx.Unlock()
return oe.initialized
}
// Execute initializes the OE and starts it in a goroutine.
func (oe *OptimisticExecution[T]) Execute(req *abci.ProcessProposalRequest) {
oe.mtx.Lock()
defer oe.mtx.Unlock()
oe.stopCh = make(chan struct{})
oe.request = &abci.FinalizeBlockRequest{
Txs: req.Txs,
DecidedLastCommit: req.ProposedLastCommit,
Misbehavior: req.Misbehavior,
Hash: req.Hash,
Height: req.Height,
Time: req.Time,
NextValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
}
oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
ctx, cancel := context.WithCancel(context.Background())
oe.cancelFunc = cancel
oe.initialized = true
go func() {
start := time.Now()
resp, stateChanges, decodedTxs, err := oe.finalizeBlockFunc(ctx, oe.request)
oe.mtx.Lock()
executionTime := time.Since(start)
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", oe.request.Height, "hash", hex.EncodeToString(oe.request.Hash))
oe.response, oe.err = &FinalizeBlockResponse[T]{
Resp: resp,
StateChanges: stateChanges,
DecodedTxs: decodedTxs,
}, err
close(oe.stopCh)
oe.mtx.Unlock()
}()
}
// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
// the running OE. Returns true if the OE was aborted.
func (oe *OptimisticExecution[T]) AbortIfNeeded(reqHash []byte) bool {
if oe == nil {
return false
}
oe.mtx.Lock()
defer oe.mtx.Unlock()
if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
// this is for test purposes only, we can emulate a certain percentage of
// OE needed to be aborted.
oe.cancelFunc()
oe.logger.Error("OE aborted due to test abort rate")
return true
}
return false
}
// Abort aborts the OE unconditionally and waits for it to finish.
func (oe *OptimisticExecution[T]) Abort() {
if oe == nil || oe.cancelFunc == nil {
return
}
oe.cancelFunc()
<-oe.stopCh
}
// WaitResult waits for the OE to finish and returns the result.
func (oe *OptimisticExecution[T]) WaitResult() (*FinalizeBlockResponse[T], error) {
<-oe.stopCh
return oe.response, oe.err
}

View File

@ -0,0 +1,35 @@
package oe
import (
"context"
"errors"
"testing"
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/stretchr/testify/assert"
)
func testFinalizeBlock[T transaction.Tx](context.Context, *abci.FinalizeBlockRequest) (*server.BlockResponse, store.WriterMap, []T, error) {
return nil, nil, nil, errors.New("test error")
}
func TestOptimisticExecution(t *testing.T) {
oe := NewOptimisticExecution[transaction.Tx](log.NewNopLogger(), testFinalizeBlock)
oe.Execute(&abci.ProcessProposalRequest{
Hash: []byte("test"),
})
assert.True(t, oe.Initialized())
resp, err := oe.WaitResult()
assert.Equal(t, &FinalizeBlockResponse[transaction.Tx]{}, resp) // empty response
assert.EqualError(t, err, "test error")
assert.False(t, oe.AbortIfNeeded([]byte("test")))
assert.True(t, oe.AbortIfNeeded([]byte("wrong_hash")))
oe.Reset()
}

View File

@ -2,6 +2,7 @@ package cometbft
import (
"context"
"cosmossdk.io/server/v2/cometbft/oe"
"crypto/sha256"
"encoding/json"
"fmt"
@ -164,7 +165,7 @@ func New[T transaction.Tx](
}
}
srv.Consensus = &consensus[T]{
c := &consensus[T]{
appName: appName,
version: getCometBFTServerVersion(),
app: app,
@ -192,6 +193,13 @@ func New[T transaction.Tx](
idPeerFilter: srv.serverOptions.IdPeerFilter,
}
c.optimisticExec = oe.NewOptimisticExecution(
logger,
c.internalFinalizeBlock,
)
srv.Consensus = c
return srv, nil
}