refactor(runtime/v2): untie runtimev2 from the legacy usage of gRPC (#22043)
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
parent
947ffe0131
commit
0f63adee90
@ -5,9 +5,8 @@ import (
|
||||
"errors"
|
||||
"slices"
|
||||
|
||||
gogoproto "github.com/cosmos/gogoproto/proto"
|
||||
|
||||
runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
|
||||
appmodulev2 "cosmossdk.io/core/appmodule/v2"
|
||||
"cosmossdk.io/core/registry"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
@ -43,9 +42,8 @@ type App[T transaction.Tx] struct {
|
||||
amino registry.AminoRegistrar
|
||||
moduleManager *MM[T]
|
||||
|
||||
// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
|
||||
// bytes into a gogoproto.Message, which then can be passed to appmanager.
|
||||
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
|
||||
// QueryHandlers defines the query handlers
|
||||
QueryHandlers map[string]appmodulev2.Handler
|
||||
|
||||
storeLoader StoreLoader
|
||||
}
|
||||
@ -120,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
|
||||
return a.AppManager
|
||||
}
|
||||
|
||||
func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
|
||||
return a.GRPCMethodsToMessageMap
|
||||
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
|
||||
return a.QueryHandlers
|
||||
}
|
||||
|
||||
@ -615,46 +615,46 @@ func (m *MM[T]) assertNoForgottenModules(
|
||||
}
|
||||
|
||||
func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], registry *protoregistry.Files) error {
|
||||
c := &configurator{
|
||||
grpcQueryDecoders: map[string]func() gogoproto.Message{},
|
||||
stfQueryRouter: app.queryRouterBuilder,
|
||||
stfMsgRouter: app.msgRouterBuilder,
|
||||
registry: registry,
|
||||
err: nil,
|
||||
}
|
||||
|
||||
// case module with services
|
||||
if services, ok := s.(hasServicesV1); ok {
|
||||
c := &configurator{
|
||||
queryHandlers: map[string]appmodulev2.Handler{},
|
||||
stfQueryRouter: app.queryRouterBuilder,
|
||||
stfMsgRouter: app.msgRouterBuilder,
|
||||
registry: registry,
|
||||
err: nil,
|
||||
}
|
||||
if err := services.RegisterServices(c); err != nil {
|
||||
return fmt.Errorf("unable to register services: %w", err)
|
||||
}
|
||||
} else {
|
||||
// If module not implement RegisterServices, register msg & query handler.
|
||||
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
|
||||
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
|
||||
module.RegisterMsgHandlers(&wrapper)
|
||||
if wrapper.error != nil {
|
||||
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
|
||||
}
|
||||
|
||||
if c.err != nil {
|
||||
app.logger.Warn("error registering services", "error", c.err)
|
||||
}
|
||||
|
||||
if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
|
||||
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
|
||||
module.RegisterQueryHandlers(&wrapper)
|
||||
|
||||
for path, decoder := range wrapper.decoders {
|
||||
app.GRPCMethodsToMessageMap[path] = decoder
|
||||
}
|
||||
// merge maps
|
||||
for path, decoder := range c.queryHandlers {
|
||||
app.QueryHandlers[path] = decoder
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if c.err != nil {
|
||||
app.logger.Warn("error registering services", "error", c.err)
|
||||
// if module implements register msg handlers
|
||||
if module, ok := s.(appmodulev2.HasMsgHandlers); ok {
|
||||
wrapper := stfRouterWrapper{stfRouter: app.msgRouterBuilder}
|
||||
module.RegisterMsgHandlers(&wrapper)
|
||||
if wrapper.error != nil {
|
||||
return fmt.Errorf("unable to register handlers: %w", wrapper.error)
|
||||
}
|
||||
}
|
||||
|
||||
// merge maps
|
||||
for path, decoder := range c.grpcQueryDecoders {
|
||||
app.GRPCMethodsToMessageMap[path] = decoder
|
||||
// if module implements register query handlers
|
||||
if module, ok := s.(appmodulev2.HasQueryHandlers); ok {
|
||||
wrapper := stfRouterWrapper{stfRouter: app.queryRouterBuilder}
|
||||
module.RegisterQueryHandlers(&wrapper)
|
||||
|
||||
for path, handler := range wrapper.handlers {
|
||||
app.QueryHandlers[path] = handler
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -663,9 +663,7 @@ func registerServices[T transaction.Tx](s appmodulev2.AppModule, app *App[T], re
|
||||
var _ grpc.ServiceRegistrar = (*configurator)(nil)
|
||||
|
||||
type configurator struct {
|
||||
// grpcQueryDecoders is required because module expose queries through gRPC
|
||||
// this provides a way to route to modules using gRPC.
|
||||
grpcQueryDecoders map[string]func() gogoproto.Message
|
||||
queryHandlers map[string]appmodulev2.Handler
|
||||
|
||||
stfQueryRouter *stf.MsgRouterBuilder
|
||||
stfMsgRouter *stf.MsgRouterBuilder
|
||||
@ -697,28 +695,31 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
|
||||
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
|
||||
for _, md := range sd.Methods {
|
||||
// TODO(tip): what if a query is not deterministic?
|
||||
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)
|
||||
|
||||
handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to register query handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
|
||||
return fmt.Errorf("unable to make a appmodulev2.HandlerFunc from gRPC handler (%s, %s): %w", sd.ServiceName, md.MethodName, err)
|
||||
}
|
||||
|
||||
// register gRPC query method.
|
||||
typ := gogoproto.MessageType(requestFullName)
|
||||
if typ == nil {
|
||||
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
|
||||
// register to stf query router.
|
||||
err = c.stfQueryRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to register handler to stf router (%s, %s): %w", sd.ServiceName, md.MethodName, err)
|
||||
}
|
||||
decoderFunc := func() gogoproto.Message {
|
||||
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
|
||||
}
|
||||
methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName)
|
||||
c.grpcQueryDecoders[methodName] = decoderFunc
|
||||
|
||||
// register query handler using the same mapping used in stf
|
||||
c.queryHandlers[gogoproto.MessageName(handler.MakeMsg())] = handler
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
|
||||
for _, md := range sd.Methods {
|
||||
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
|
||||
handler, err := grpcHandlerToAppModuleHandler(sd, md, ss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = c.stfMsgRouter.RegisterHandler(gogoproto.MessageName(handler.MakeMsg()), handler.Func)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to register msg handler %s.%s: %w", sd.ServiceName, md.MethodName, err)
|
||||
}
|
||||
@ -726,32 +727,27 @@ func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{})
|
||||
return nil
|
||||
}
|
||||
|
||||
// requestFullNameFromMethodDesc returns the fully-qualified name of the request message of the provided service's method.
|
||||
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, error) {
|
||||
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
|
||||
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
|
||||
}
|
||||
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("invalid method descriptor %s", methodFullName)
|
||||
}
|
||||
return methodDesc.Input().FullName(), nil
|
||||
}
|
||||
|
||||
func registerMethod(
|
||||
stfRouter *stf.MsgRouterBuilder,
|
||||
// grpcHandlerToAppModuleHandler converts a gRPC handler into an appmodulev2.HandlerFunc.
|
||||
func grpcHandlerToAppModuleHandler(
|
||||
sd *grpc.ServiceDesc,
|
||||
md grpc.MethodDesc,
|
||||
ss interface{},
|
||||
) (string, error) {
|
||||
requestName, err := requestFullNameFromMethodDesc(sd, md)
|
||||
) (appmodulev2.Handler, error) {
|
||||
requestName, responseName, err := requestFullNameFromMethodDesc(sd, md)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return appmodulev2.Handler{}, err
|
||||
}
|
||||
|
||||
return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
|
||||
requestTyp := gogoproto.MessageType(string(requestName))
|
||||
if requestTyp == nil {
|
||||
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", requestName)
|
||||
}
|
||||
responseTyp := gogoproto.MessageType(string(responseName))
|
||||
if responseTyp == nil {
|
||||
return appmodulev2.Handler{}, fmt.Errorf("no proto message found for %s", responseName)
|
||||
}
|
||||
|
||||
handlerFunc := func(
|
||||
ctx context.Context,
|
||||
msg transaction.Msg,
|
||||
) (resp transaction.Msg, err error) {
|
||||
@ -760,7 +756,17 @@ func registerMethod(
|
||||
return nil, err
|
||||
}
|
||||
return res.(transaction.Msg), nil
|
||||
})
|
||||
}
|
||||
|
||||
return appmodulev2.Handler{
|
||||
Func: handlerFunc,
|
||||
MakeMsg: func() transaction.Msg {
|
||||
return reflect.New(requestTyp.Elem()).Interface().(transaction.Msg)
|
||||
},
|
||||
MakeMsgResp: func() transaction.Msg {
|
||||
return reflect.New(responseTyp.Elem()).Interface().(transaction.Msg)
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func noopDecoder(_ interface{}) error { return nil }
|
||||
@ -776,6 +782,20 @@ func messagePassingInterceptor(msg transaction.Msg) grpc.UnaryServerInterceptor
|
||||
}
|
||||
}
|
||||
|
||||
// requestFullNameFromMethodDesc returns the fully-qualified name of the request message and response of the provided service's method.
|
||||
func requestFullNameFromMethodDesc(sd *grpc.ServiceDesc, method grpc.MethodDesc) (protoreflect.FullName, protoreflect.FullName, error) {
|
||||
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
|
||||
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("cannot find method descriptor %s", methodFullName)
|
||||
}
|
||||
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
|
||||
if !ok {
|
||||
return "", "", fmt.Errorf("invalid method descriptor %s", methodFullName)
|
||||
}
|
||||
return methodDesc.Input().FullName(), methodDesc.Output().FullName(), nil
|
||||
}
|
||||
|
||||
// defaultMigrationsOrder returns a default migrations order: ascending alphabetical by module name,
|
||||
// except x/auth which will run last, see:
|
||||
// https://github.com/cosmos/cosmos-sdk/issues/10591
|
||||
@ -815,7 +835,7 @@ type stfRouterWrapper struct {
|
||||
|
||||
error error
|
||||
|
||||
decoders map[string]func() gogoproto.Message
|
||||
handlers map[string]appmodulev2.Handler
|
||||
}
|
||||
|
||||
func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) {
|
||||
@ -831,7 +851,7 @@ func (s *stfRouterWrapper) RegisterHandler(handler appmodulev2.Handler) {
|
||||
|
||||
// also make the decoder
|
||||
if s.error == nil {
|
||||
s.decoders = map[string]func() gogoproto.Message{}
|
||||
s.handlers = map[string]appmodulev2.Handler{}
|
||||
}
|
||||
s.decoders[requestName] = handler.MakeMsg
|
||||
s.handlers[requestName] = handler
|
||||
}
|
||||
|
||||
@ -127,13 +127,13 @@ func ProvideAppBuilder[T transaction.Tx](
|
||||
|
||||
msgRouterBuilder := stf.NewMsgRouterBuilder()
|
||||
app := &App[T]{
|
||||
storeKeys: nil,
|
||||
interfaceRegistrar: interfaceRegistrar,
|
||||
amino: amino,
|
||||
msgRouterBuilder: msgRouterBuilder,
|
||||
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
|
||||
GRPCMethodsToMessageMap: map[string]func() proto.Message{},
|
||||
storeLoader: DefaultStoreLoader,
|
||||
storeKeys: nil,
|
||||
interfaceRegistrar: interfaceRegistrar,
|
||||
amino: amino,
|
||||
msgRouterBuilder: msgRouterBuilder,
|
||||
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
|
||||
QueryHandlers: map[string]appmodulev2.Handler{},
|
||||
storeLoader: DefaultStoreLoader,
|
||||
}
|
||||
appBuilder := &AppBuilder[T]{app: app}
|
||||
|
||||
|
||||
@ -9,14 +9,18 @@ import (
|
||||
"net"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
gogoproto "github.com/cosmos/gogoproto/proto"
|
||||
"github.com/spf13/pflag"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
|
||||
appmodulev2 "cosmossdk.io/core/appmodule/v2"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
serverv2 "cosmossdk.io/server/v2"
|
||||
@ -53,7 +57,7 @@ func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.L
|
||||
return fmt.Errorf("failed to unmarshal config: %w", err)
|
||||
}
|
||||
}
|
||||
methodsMap := appI.GetGPRCMethodsToMessageMap()
|
||||
methodsMap := appI.GetQueryHandlers()
|
||||
|
||||
grpcSrv := grpc.NewServer(
|
||||
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
|
||||
@ -80,21 +84,40 @@ func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
|
||||
return flags
|
||||
}
|
||||
|
||||
func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface {
|
||||
Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error)
|
||||
func makeUnknownServiceHandler(handlers map[string]appmodulev2.Handler, querier interface {
|
||||
Query(ctx context.Context, version uint64, msg gogoproto.Message) (gogoproto.Message, error)
|
||||
},
|
||||
) grpc.StreamHandler {
|
||||
getRegistry := sync.OnceValues(gogoproto.MergedRegistry)
|
||||
|
||||
return func(srv any, stream grpc.ServerStream) error {
|
||||
method, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Error(codes.InvalidArgument, "unable to get method")
|
||||
}
|
||||
makeMsg, exists := messageMap[method]
|
||||
// if this fails we cannot serve queries anymore...
|
||||
registry, err := getRegistry()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get registry: %w", err)
|
||||
}
|
||||
fullName := protoreflect.FullName(strings.ReplaceAll(method, "/", "."))
|
||||
// get descriptor from the invoke method
|
||||
desc, err := registry.FindDescriptorByName(fullName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to find descriptor %s: %w", method, err)
|
||||
}
|
||||
md, ok := desc.(protoreflect.MethodDescriptor)
|
||||
if !ok {
|
||||
return fmt.Errorf("%s is not a method", method)
|
||||
}
|
||||
// find handler
|
||||
handler, exists := handlers[string(md.Input().FullName())]
|
||||
if !exists {
|
||||
return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method)
|
||||
}
|
||||
|
||||
for {
|
||||
req := makeMsg()
|
||||
req := handler.MakeMsg()
|
||||
err := stream.RecvMsg(req)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
|
||||
@ -5,13 +5,18 @@ import (
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
|
||||
gogoproto "github.com/cosmos/gogoproto/proto"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
"google.golang.org/protobuf/reflect/protoregistry"
|
||||
|
||||
"cosmossdk.io/collections"
|
||||
appmodulev2 "cosmossdk.io/core/appmodule/v2"
|
||||
"cosmossdk.io/core/comet"
|
||||
corecontext "cosmossdk.io/core/context"
|
||||
"cosmossdk.io/core/event"
|
||||
@ -63,7 +68,8 @@ type Consensus[T transaction.Tx] struct {
|
||||
addrPeerFilter types.PeerFilter // filter peers by address and port
|
||||
idPeerFilter types.PeerFilter // filter peers by node ID
|
||||
|
||||
grpcMethodsMap map[string]func() transaction.Msg // maps gRPC method to message creator func
|
||||
queryHandlersMap map[string]appmodulev2.Handler
|
||||
getProtoRegistry func() (*protoregistry.Files, error)
|
||||
}
|
||||
|
||||
func NewConsensus[T transaction.Tx](
|
||||
@ -72,7 +78,7 @@ func NewConsensus[T transaction.Tx](
|
||||
app *appmanager.AppManager[T],
|
||||
mp mempool.Mempool[T],
|
||||
indexedEvents map[string]struct{},
|
||||
gRPCMethodsMap map[string]func() transaction.Msg,
|
||||
queryHandlersMap map[string]appmodulev2.Handler,
|
||||
store types.Store,
|
||||
cfg Config,
|
||||
txCodec transaction.Codec[T],
|
||||
@ -81,7 +87,6 @@ func NewConsensus[T transaction.Tx](
|
||||
return &Consensus[T]{
|
||||
appName: appName,
|
||||
version: getCometBFTServerVersion(),
|
||||
grpcMethodsMap: gRPCMethodsMap,
|
||||
app: app,
|
||||
cfg: cfg,
|
||||
store: store,
|
||||
@ -98,6 +103,8 @@ func NewConsensus[T transaction.Tx](
|
||||
chainID: chainId,
|
||||
indexedEvents: indexedEvents,
|
||||
initialHeight: 0,
|
||||
queryHandlersMap: queryHandlersMap,
|
||||
getProtoRegistry: sync.OnceValues(func() (*protoregistry.Files, error) { return gogoproto.MergedRegistry() }),
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,23 +205,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
|
||||
// Query implements types.Application.
|
||||
// It is called by cometbft to query application state.
|
||||
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
|
||||
// check if it's a gRPC method
|
||||
makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path]
|
||||
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
|
||||
if isGRPC {
|
||||
protoRequest := makeGRPCRequest()
|
||||
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
|
||||
}
|
||||
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
|
||||
if err != nil {
|
||||
resp := QueryResult(err, c.cfg.AppTomlConfig.Trace)
|
||||
resp.Height = req.Height
|
||||
return resp, err
|
||||
|
||||
}
|
||||
|
||||
return queryResponse(res, req.Height)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// this error most probably means that we can't handle it with a proto message, so
|
||||
@ -245,6 +238,49 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
|
||||
// if this fails then we cannot serve queries anymore
|
||||
registry, err := c.getProtoRegistry()
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// in order to check if it's a gRPC query we ensure that there's a descriptor
|
||||
// for the path, if such descriptor exists, and it is a method descriptor
|
||||
// then we assume this is a gRPC query.
|
||||
fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", "."))
|
||||
|
||||
desc, err := registry.FindDescriptorByName(fullName)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
md, isGRPC := desc.(protoreflect.MethodDescriptor)
|
||||
if !isGRPC {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
handler, found := c.queryHandlersMap[string(md.Input().FullName())]
|
||||
if !found {
|
||||
return nil, true, fmt.Errorf("no query handler found for %s", fullName)
|
||||
}
|
||||
protoRequest := handler.MakeMsg()
|
||||
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
|
||||
}
|
||||
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
|
||||
if err != nil {
|
||||
resp := QueryResult(err, c.cfg.AppTomlConfig.Trace)
|
||||
resp.Height = req.Height
|
||||
return resp, true, err
|
||||
|
||||
}
|
||||
|
||||
resp, err = queryResponse(res, req.Height)
|
||||
return resp, isGRPC, err
|
||||
}
|
||||
|
||||
// InitChain implements types.Application.
|
||||
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
|
||||
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)
|
||||
|
||||
@ -36,6 +36,7 @@ require (
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.9.0
|
||||
google.golang.org/protobuf v1.34.2
|
||||
sigs.k8s.io/yaml v1.4.0
|
||||
)
|
||||
|
||||
@ -179,7 +180,6 @@ require (
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect
|
||||
google.golang.org/grpc v1.67.1 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
gotest.tools/v3 v3.5.1 // indirect
|
||||
|
||||
@ -105,7 +105,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
|
||||
appI.GetAppManager(),
|
||||
s.serverOptions.Mempool(cfg),
|
||||
indexEvents,
|
||||
appI.GetGPRCMethodsToMessageMap(),
|
||||
appI.GetQueryHandlers(),
|
||||
store,
|
||||
s.config,
|
||||
s.initTxCodec,
|
||||
|
||||
@ -13,7 +13,7 @@ replace (
|
||||
|
||||
require (
|
||||
cosmossdk.io/api v0.7.6
|
||||
cosmossdk.io/core v1.0.0-alpha.3
|
||||
cosmossdk.io/core v1.0.0-alpha.4
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29
|
||||
cosmossdk.io/log v1.4.1
|
||||
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cosmossdk.io/core v1.0.0-alpha.3 h1:pnxaYAas7llXgVz1lM7X6De74nWrhNKnB3yMKe4OUUA=
|
||||
cosmossdk.io/core v1.0.0-alpha.3/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY=
|
||||
cosmossdk.io/core v1.0.0-alpha.4 h1:9iuroT9ejDYETCsGkzkvs/wAY/5UFl7nCIINFRxyMJY=
|
||||
cosmossdk.io/core v1.0.0-alpha.4/go.mod h1:3u9cWq1FAVtiiCrDPpo4LhR+9V6k/ycSG4/Y/tREWCY=
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 h1:NxxUo0GMJUbIuVg0R70e3cbn9eFTEuMr7ev1AFvypdY=
|
||||
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29/go.mod h1:8s2tPeJtSiQuoyPmr2Ag7meikonISO4Fv4MoO8+ORrs=
|
||||
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqFG1+UgeU1JzZrWtwuWzI3ZfwA=
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
appmodulev2 "cosmossdk.io/core/appmodule/v2"
|
||||
coreserver "cosmossdk.io/core/server"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
@ -35,8 +36,8 @@ type mockApp[T transaction.Tx] struct {
|
||||
serverv2.AppI[T]
|
||||
}
|
||||
|
||||
func (*mockApp[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
|
||||
return map[string]func() gogoproto.Message{}
|
||||
func (*mockApp[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
|
||||
return map[string]appmodulev2.Handler{}
|
||||
}
|
||||
|
||||
func (*mockApp[T]) GetAppManager() *appmanager.AppManager[T] {
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
package serverv2
|
||||
|
||||
import (
|
||||
gogoproto "github.com/cosmos/gogoproto/proto"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
appmodulev2 "cosmossdk.io/core/appmodule/v2"
|
||||
"cosmossdk.io/core/server"
|
||||
"cosmossdk.io/core/transaction"
|
||||
"cosmossdk.io/log"
|
||||
@ -16,6 +16,6 @@ type AppI[T transaction.Tx] interface {
|
||||
Name() string
|
||||
InterfaceRegistry() server.InterfaceRegistry
|
||||
GetAppManager() *appmanager.AppManager[T]
|
||||
GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message
|
||||
GetQueryHandlers() map[string]appmodulev2.Handler
|
||||
GetStore() any
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user