Co-authored-by: testinginprod <98415576+testinginprod@users.noreply.github.com>
This commit is contained in:
parent
4fed7392c6
commit
d2987a5b77
@ -1,13 +1,18 @@
|
||||
package baseapp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
abci "github.com/cometbft/cometbft/abci/types"
|
||||
gogogrpc "github.com/cosmos/gogoproto/grpc"
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
"google.golang.org/protobuf/runtime/protoiface"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/baseapp/internal/protocompat"
|
||||
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
|
||||
@ -16,8 +21,16 @@ import (
|
||||
|
||||
// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
|
||||
type GRPCQueryRouter struct {
|
||||
routes map[string]GRPCQueryHandler
|
||||
cdc encoding.Codec
|
||||
// routes maps query handlers used in ABCIQuery.
|
||||
routes map[string]GRPCQueryHandler
|
||||
// handlerByMessageName maps the request name to the handler. It is a hybrid handler which seamlessly
|
||||
// handles both gogo and protov2 messages.
|
||||
handlerByMessageName map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error
|
||||
// binaryCodec is used to encode/decode binary protobuf messages.
|
||||
binaryCodec codec.BinaryCodec
|
||||
// cdc is the gRPC codec used by the router to correctly unmarshal messages.
|
||||
cdc encoding.Codec
|
||||
// serviceData contains the gRPC services and their handlers.
|
||||
serviceData []serviceData
|
||||
}
|
||||
|
||||
@ -32,7 +45,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
|
||||
// NewGRPCQueryRouter creates a new GRPCQueryRouter
|
||||
func NewGRPCQueryRouter() *GRPCQueryRouter {
|
||||
return &GRPCQueryRouter{
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
routes: map[string]GRPCQueryHandler{},
|
||||
handlerByMessageName: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,46 +72,13 @@ func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
|
||||
func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
|
||||
// adds a top-level query handler based on the gRPC service name
|
||||
for _, method := range sd.Methods {
|
||||
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
|
||||
methodHandler := method.Handler
|
||||
|
||||
// Check that each service is only registered once. If a service is
|
||||
// registered more than once, then we should error. Since we can't
|
||||
// return an error (`Server.RegisterService` interface restriction) we
|
||||
// panic (at startup).
|
||||
_, found := qrt.routes[fqName]
|
||||
if found {
|
||||
panic(
|
||||
fmt.Errorf(
|
||||
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
|
||||
"This usually means that there are conflicting modules registering the same gRPC query service",
|
||||
fqName,
|
||||
),
|
||||
)
|
||||
err := qrt.registerABCIQueryHandler(sd, method, handler)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
|
||||
// call the method handler from the service description with the handler object,
|
||||
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
|
||||
res, err := methodHandler(handler, ctx, func(i interface{}) error {
|
||||
return qrt.cdc.Unmarshal(req.Data, i)
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// proto marshal the result bytes
|
||||
var resBytes []byte
|
||||
resBytes, err = qrt.cdc.Marshal(res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// return the result bytes as the response value
|
||||
return &abci.ResponseQuery{
|
||||
Height: req.Height,
|
||||
Value: resBytes,
|
||||
}, nil
|
||||
err = qrt.registerHandlerByMessageName(sd, method, handler)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -107,9 +88,77 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
|
||||
})
|
||||
}
|
||||
|
||||
func (qrt *GRPCQueryRouter) registerABCIQueryHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
|
||||
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
|
||||
methodHandler := method.Handler
|
||||
|
||||
// Check that each service is only registered once. If a service is
|
||||
// registered more than once, then we should error. Since we can't
|
||||
// return an error (`Server.RegisterService` interface restriction) we
|
||||
// panic (at startup).
|
||||
_, found := qrt.routes[fqName]
|
||||
if found {
|
||||
return fmt.Errorf(
|
||||
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
|
||||
"This usually means that there are conflicting modules registering the same gRPC query service",
|
||||
fqName,
|
||||
)
|
||||
}
|
||||
|
||||
qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
|
||||
// call the method handler from the service description with the handler object,
|
||||
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
|
||||
res, err := methodHandler(handler, ctx, func(i interface{}) error {
|
||||
return qrt.cdc.Unmarshal(req.Data, i)
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// proto marshal the result bytes
|
||||
var resBytes []byte
|
||||
resBytes, err = qrt.cdc.Marshal(res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// return the result bytes as the response value
|
||||
return &abci.ResponseQuery{
|
||||
Height: req.Height,
|
||||
Value: resBytes,
|
||||
}, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qrt *GRPCQueryRouter) HandlersByRequestName(name string) []func(ctx context.Context, req, resp protoiface.MessageV1) error {
|
||||
return qrt.handlerByMessageName[name]
|
||||
}
|
||||
|
||||
func (qrt *GRPCQueryRouter) registerHandlerByMessageName(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
|
||||
// extract message name from method descriptor
|
||||
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
|
||||
desc, err := proto.HybridResolver.FindDescriptorByName(methodFullName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot find method descriptor %s", methodFullName)
|
||||
}
|
||||
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid method descriptor %s", methodFullName)
|
||||
}
|
||||
inputName := methodDesc.Input().FullName()
|
||||
methodHandler, err := protocompat.MakeHybridHandler(qrt.binaryCodec, sd, method, handler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qrt.handlerByMessageName[string(inputName)] = append(qrt.handlerByMessageName[string(inputName)], methodHandler)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetInterfaceRegistry sets the interface registry for the router. This will
|
||||
// also register the interface reflection gRPC service.
|
||||
func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) {
|
||||
qrt.binaryCodec = codec.NewProtoCodec(interfaceRegistry)
|
||||
// instantiate the codec
|
||||
qrt.cdc = codec.NewProtoCodec(interfaceRegistry).GRPCCodec()
|
||||
// Once we have an interface registry, we can register the interface
|
||||
|
||||
@ -53,6 +53,50 @@ func TestGRPCQueryRouter(t *testing.T) {
|
||||
require.Equal(t, spot, res3.HasAnimal.Animal.GetCachedValue())
|
||||
}
|
||||
|
||||
func TestGRPCRouterHybridHandlers(t *testing.T) {
|
||||
assertRouterBehaviour := func(helper *baseapp.QueryServiceTestHelper) {
|
||||
// test getting the handler by name
|
||||
handlers := helper.GRPCQueryRouter.HandlersByRequestName("testpb.EchoRequest")
|
||||
require.NotNil(t, handlers)
|
||||
require.Len(t, handlers, 1)
|
||||
handler := handlers[0]
|
||||
// sending a protov2 message should work, and return a protov2 message
|
||||
v2Resp := new(testdata_pulsar.EchoResponse)
|
||||
err := handler(helper.Ctx, &testdata_pulsar.EchoRequest{Message: "hello"}, v2Resp)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, "hello", v2Resp.Message)
|
||||
// also sending a protov1 message should work, and return a gogoproto message
|
||||
gogoResp := new(testdata.EchoResponse)
|
||||
err = handler(helper.Ctx, &testdata.EchoRequest{Message: "hello"}, gogoResp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "hello", gogoResp.Message)
|
||||
}
|
||||
|
||||
t.Run("protov2 server", func(t *testing.T) {
|
||||
qr := baseapp.NewGRPCQueryRouter()
|
||||
interfaceRegistry := testdata.NewTestInterfaceRegistry()
|
||||
qr.SetInterfaceRegistry(interfaceRegistry)
|
||||
testdata_pulsar.RegisterQueryServer(qr, testdata_pulsar.QueryImpl{})
|
||||
helper := &baseapp.QueryServiceTestHelper{
|
||||
GRPCQueryRouter: qr,
|
||||
Ctx: sdk.Context{}.WithContext(context.Background()),
|
||||
}
|
||||
assertRouterBehaviour(helper)
|
||||
})
|
||||
|
||||
t.Run("gogoproto server", func(t *testing.T) {
|
||||
qr := baseapp.NewGRPCQueryRouter()
|
||||
interfaceRegistry := testdata.NewTestInterfaceRegistry()
|
||||
qr.SetInterfaceRegistry(interfaceRegistry)
|
||||
testdata.RegisterQueryServer(qr, testdata.QueryImpl{})
|
||||
helper := &baseapp.QueryServiceTestHelper{
|
||||
GRPCQueryRouter: qr,
|
||||
Ctx: sdk.Context{}.WithContext(context.Background()),
|
||||
}
|
||||
assertRouterBehaviour(helper)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRegisterQueryServiceTwice(t *testing.T) {
|
||||
// Setup baseapp.
|
||||
var appBuilder *runtime.AppBuilder
|
||||
|
||||
207
baseapp/internal/protocompat/protocompat.go
Normal file
207
baseapp/internal/protocompat/protocompat.go
Normal file
@ -0,0 +1,207 @@
|
||||
package protocompat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
gogoproto "github.com/cosmos/gogoproto/proto"
|
||||
"google.golang.org/grpc"
|
||||
proto2 "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
"google.golang.org/protobuf/reflect/protoregistry"
|
||||
"google.golang.org/protobuf/runtime/protoiface"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/codec"
|
||||
)
|
||||
|
||||
var (
|
||||
gogoType = reflect.TypeOf((*gogoproto.Message)(nil)).Elem()
|
||||
protov2Type = reflect.TypeOf((*proto2.Message)(nil)).Elem()
|
||||
)
|
||||
|
||||
type Handler = func(ctx context.Context, request, response protoiface.MessageV1) error
|
||||
|
||||
func MakeHybridHandler(cdc codec.BinaryCodec, sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) (Handler, error) {
|
||||
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
|
||||
desc, err := gogoproto.HybridResolver.FindDescriptorByName(methodFullName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid method descriptor %s", methodFullName)
|
||||
}
|
||||
|
||||
isProtov2Handler, err := isProtov2(method)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isProtov2Handler {
|
||||
return makeProtoV2HybridHandler(methodDesc, cdc, method, handler)
|
||||
}
|
||||
return makeGogoHybridHandler(methodDesc, cdc, method, handler)
|
||||
}
|
||||
|
||||
// makeProtoV2HybridHandler returns a handler that can handle both gogo and protov2 messages.
|
||||
func makeProtoV2HybridHandler(prefMethod protoreflect.MethodDescriptor, cdc codec.BinaryCodec, method grpc.MethodDesc, handler any) (Handler, error) {
|
||||
// it's a protov2 handler, if a gogo counterparty is not found we cannot handle gogo messages.
|
||||
gogoExists := gogoproto.MessageType(string(prefMethod.Output().FullName())) != nil
|
||||
if !gogoExists {
|
||||
return func(ctx context.Context, inReq, outResp protoiface.MessageV1) error {
|
||||
protov2Request, ok := inReq.(proto2.Message)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid request type %T, method %s does not accept gogoproto messages", inReq, prefMethod.FullName())
|
||||
}
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
proto2.Merge(msg.(proto2.Message), protov2Request)
|
||||
return nil
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// merge on the resp
|
||||
proto2.Merge(outResp.(proto2.Message), resp.(proto2.Message))
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
return func(ctx context.Context, inReq, outResp protoiface.MessageV1) error {
|
||||
// we check if the request is a protov2 message.
|
||||
switch m := inReq.(type) {
|
||||
case proto2.Message:
|
||||
// we can just call the handler after making a copy of the message, for safety reasons.
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
proto2.Merge(msg.(proto2.Message), m)
|
||||
return nil
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// merge on the resp
|
||||
proto2.Merge(outResp.(proto2.Message), resp.(proto2.Message))
|
||||
return nil
|
||||
case gogoproto.Message:
|
||||
// we need to marshal and unmarshal the request.
|
||||
requestBytes, err := cdc.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
// unmarshal request into the message.
|
||||
return proto2.Unmarshal(requestBytes, msg.(proto2.Message))
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// the response is a protov2 message, so we cannot just return it.
|
||||
// since the request came as gogoproto, we expect the response
|
||||
// to also be gogoproto.
|
||||
respBytes, err := proto2.Marshal(resp.(proto2.Message))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// unmarshal response into a gogo message.
|
||||
return cdc.Unmarshal(respBytes, outResp.(gogoproto.Message))
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeGogoHybridHandler(prefMethod protoreflect.MethodDescriptor, cdc codec.BinaryCodec, method grpc.MethodDesc, handler any) (Handler, error) {
|
||||
// it's a gogo handler, we check if the existing protov2 counterparty exists.
|
||||
_, err := protoregistry.GlobalTypes.FindMessageByName(prefMethod.Output().FullName())
|
||||
if err != nil {
|
||||
// this can only be a gogo message.
|
||||
return func(ctx context.Context, inReq, outResp protoiface.MessageV1) error {
|
||||
_, ok := inReq.(proto2.Message)
|
||||
if ok {
|
||||
return fmt.Errorf("invalid request type %T, method %s does not accept protov2 messages", inReq, prefMethod.FullName())
|
||||
}
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
// merge! ref: https://github.com/cosmos/cosmos-sdk/issues/18003
|
||||
gogoproto.Merge(msg.(gogoproto.Message), inReq)
|
||||
return nil
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// merge resp, ref: https://github.com/cosmos/cosmos-sdk/issues/18003
|
||||
gogoproto.Merge(outResp.(gogoproto.Message), resp.(gogoproto.Message))
|
||||
return nil
|
||||
}, nil
|
||||
}
|
||||
// this is a gogo handler, and we have a protov2 counterparty.
|
||||
return func(ctx context.Context, inReq, outResp protoiface.MessageV1) error {
|
||||
switch m := inReq.(type) {
|
||||
case proto2.Message:
|
||||
// we need to marshal and unmarshal the request.
|
||||
requestBytes, err := proto2.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
// unmarshal request into the message.
|
||||
return cdc.Unmarshal(requestBytes, msg.(gogoproto.Message))
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// the response is a gogo message, so we cannot just return it.
|
||||
// since the request came as protov2, we expect the response
|
||||
// to also be protov2.
|
||||
respBytes, err := cdc.Marshal(resp.(gogoproto.Message))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// now we unmarshal back into a protov2 message.
|
||||
return proto2.Unmarshal(respBytes, outResp.(proto2.Message))
|
||||
case gogoproto.Message:
|
||||
// we can just call the handler after making a copy of the message, for safety reasons.
|
||||
resp, err := method.Handler(handler, ctx, func(msg any) error {
|
||||
// ref: https://github.com/cosmos/cosmos-sdk/issues/18003
|
||||
gogoproto.Merge(msg.(gogoproto.Message), m)
|
||||
return nil
|
||||
}, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// merge on the resp, ref: https://github.com/cosmos/cosmos-sdk/issues/18003
|
||||
gogoproto.Merge(outResp.(gogoproto.Message), resp.(gogoproto.Message))
|
||||
return nil
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isProtov2 returns true if the given method accepts protov2 messages.
|
||||
// Returns false if it does not.
|
||||
// It uses the decoder function passed to the method handler to determine
|
||||
// the type. Since the decoder function is passed in by the concrete implementer the expected
|
||||
// message where bytes are unmarshaled to, we can use that to determine the type.
|
||||
func isProtov2(md grpc.MethodDesc) (isV2Type bool, err error) {
|
||||
pullRequestType := func(msg interface{}) error {
|
||||
typ := reflect.TypeOf(msg)
|
||||
switch {
|
||||
case typ.Implements(protov2Type):
|
||||
isV2Type = true
|
||||
return nil
|
||||
case typ.Implements(gogoType):
|
||||
isV2Type = false
|
||||
return nil
|
||||
default:
|
||||
err = fmt.Errorf("invalid request type %T, expected protov2 or gogo message", msg)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// doNotExecute is a dummy handler that stops the request execution.
|
||||
doNotExecute := func(_ context.Context, _ any, _ *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (any, error) {
|
||||
return nil, nil
|
||||
}
|
||||
// we are allowed to pass in a nil context and nil request, since we are not actually executing the request.
|
||||
// this is made possible by the doNotExecute function which immediately returns without calling other handlers.
|
||||
_, _ = md.Handler(nil, nil, pullRequestType, doNotExecute)
|
||||
return
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user