feat(serverv2): integrate gRPC (backport #21038) (#21132)

Co-authored-by: testinginprod <98415576+testinginprod@users.noreply.github.com>
Co-authored-by: Julien Robert <julien@rbrt.fr>
This commit is contained in:
mergify[bot] 2024-07-31 23:21:30 +02:00 committed by GitHub
parent 3135f41a6d
commit 0ca08e1ee1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 189 additions and 152 deletions

View File

@ -44,9 +44,9 @@ type App[T transaction.Tx] struct {
amino legacy.Amino
moduleManager *MM[T]
// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
}
// Name returns the app name.
@ -118,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}
func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
return a.GRPCQueryDecoders
func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return a.GRPCMethodsToMessageMap
}

View File

@ -556,7 +556,7 @@ func (m *MM[T]) assertNoForgottenModules(
func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
grpcQueryDecoders: map[string]func() gogoproto.Message{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
@ -567,7 +567,10 @@ func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], re
if err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
app.GRPCQueryDecoders = c.grpcQueryDecoders
// merge maps
for path, decoder := range c.grpcQueryDecoders {
app.GRPCMethodsToMessageMap[path] = decoder
}
return nil
}
@ -576,7 +579,7 @@ var _ grpc.ServiceRegistrar = (*configurator)(nil)
type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)
grpcQueryDecoders map[string]func() gogoproto.Message
stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
@ -618,11 +621,11 @@ func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
return msg, gogoproto.Unmarshal(bytes, msg)
decoderFunc := func() gogoproto.Message {
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
}
c.grpcQueryDecoders[md.MethodName] = decoderFunc
methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName)
c.grpcQueryDecoders[methodName] = decoderFunc
}
return nil
}

View File

@ -130,11 +130,12 @@ func ProvideAppBuilder[T transaction.Tx](
msgRouterBuilder := stf.NewMsgRouterBuilder()
app := &App[T]{
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
GRPCMethodsToMessageMap: map[string]func() proto.Message{},
}
appBuilder := &AppBuilder[T]{app: app}

View File

@ -42,33 +42,41 @@ import (
"errors"
"fmt"
"io"
"log"
"reflect"
"sort"
"strings"
"sync"
//nolint: staticcheck // keep this import for backward compatibility
"github.com/golang/protobuf/proto"
gogoproto "github.com/cosmos/gogoproto/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"cosmossdk.io/core/log"
)
type serverReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s *grpc.Server
methods []string
initSymbols sync.Once
serviceNames []string
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
log log.Logger
}
// Register registers the server reflection service on the given gRPC server.
func Register(s *grpc.Server) {
func Register(s *grpc.Server, methods []string, logger log.Logger) {
rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
s: s,
s: s,
methods: methods,
log: logger,
})
}
@ -82,21 +90,12 @@ type protoMessage interface {
func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()
s.symbols = map[string]*dpb.FileDescriptorProto{}
s.serviceNames = make([]string, 0, len(serviceInfo))
services, fds := s.getServices(s.methods)
s.serviceNames = services
processed := map[string]struct{}{}
for svc, info := range serviceInfo {
s.serviceNames = append(s.serviceNames, svc)
fdenc, ok := parseMetadata(info.Metadata)
if !ok {
continue
}
fd, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
for _, fd := range fds {
s.processFile(fd, processed)
}
sort.Strings(s.serviceNames)
@ -207,7 +206,7 @@ func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
}
fd := new(dpb.FileDescriptorProto)
if err := proto.Unmarshal(raw, fd); err != nil {
if err := gogoproto.Unmarshal(raw, fd); err != nil {
return nil, fmt.Errorf("bad descriptor: %w", err)
}
return fd, nil
@ -237,7 +236,7 @@ func typeForName(name string) (reflect.Type, error) {
}
func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
@ -252,7 +251,7 @@ func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescripto
}
func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
@ -272,7 +271,7 @@ func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors m
queue = queue[1:]
if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.GetName()] = true
currentfdEncoded, err := proto.Marshal(currentfd)
currentfdEncoded, err := gogoproto.Marshal(currentfd)
if err != nil {
return nil, err
}
@ -305,24 +304,6 @@ func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFil
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// parseMetadata finds the file descriptor bytes specified meta.
// For SupportPackageIsVersion4, m is the name of the proto file, we
// call proto.FileDescriptor to get the byte slice.
// For SupportPackageIsVersion3, m is a byte slice itself.
func parseMetadata(meta interface{}) ([]byte, bool) {
// Check if meta is the file name.
if fileNameForMeta, ok := meta.(string); ok {
return getFileDescriptor(fileNameForMeta), true
}
// Check if meta is the byte slice.
if enc, ok := meta.([]byte); ok {
return enc, true
}
return nil, false
}
// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshaling on them, and returns the marshaled result. The given symbol
@ -446,7 +427,6 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio
ErrorMessage: err.Error(),
},
}
log.Printf("OH NO: %s", err)
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{ //nolint:staticcheck // SA1019: we want to keep using v1alpha
@ -476,3 +456,28 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio
}
}
}
// getServices gets the unique list of services given a list of methods.
func (s *serverReflectionServer) getServices(methods []string) (svcs []string, fds []*dpb.FileDescriptorProto) {
registry, err := gogoproto.MergedRegistry()
if err != nil {
s.log.Error("unable to load merged registry", "err", err)
return nil, nil
}
seenSvc := map[protoreflect.FullName]struct{}{}
for _, methodName := range methods {
methodName = strings.Join(strings.Split(methodName[1:], "/"), ".")
md, err := registry.FindDescriptorByName(protoreflect.FullName(methodName))
if err != nil {
s.log.Error("unable to load method descriptor", "method", methodName, "err", err)
continue
}
svc := md.(protoreflect.MethodDescriptor).Parent()
if _, seen := seenSvc[svc.FullName()]; !seen {
svcs = append(svcs, string(svc.FullName()))
file := svc.ParentFile()
fds = append(fds, protodesc.ToFileDescriptorProto(file))
}
}
return
}

View File

@ -2,11 +2,20 @@ package grpc
import (
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"github.com/cosmos/gogoproto/proto"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
@ -14,7 +23,12 @@ import (
"cosmossdk.io/server/v2/api/grpc/gogoreflection"
)
type GRPCServer[T transaction.Tx] struct {
const (
BlockHeightHeader = "x-cosmos-block-height"
FlagAddress = "address"
)
type Server[T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption
@ -23,32 +37,34 @@ type GRPCServer[T transaction.Tx] struct {
}
// New creates a new grpc server.
func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] {
return &GRPCServer[T]{
func New[T transaction.Tx](cfgOptions ...CfgOption) *Server[T] {
return &Server[T]{
cfgOptions: cfgOptions,
}
}
// Init returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
func (s *Server[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
methodsMap := appI.GetGPRCMethodsToMessageMap()
grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()),
grpc.MaxSendMsgSize(cfg.MaxSendMsgSize),
grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize),
grpc.UnknownServiceHandler(
makeUnknownServiceHandler(methodsMap, appI.GetAppManager()),
),
)
// appI.RegisterGRPCServer(grpcSrv)
// Reflection allows external clients to see what services and methods the gRPC server exposes.
gogoreflection.Register(grpcSrv)
gogoreflection.Register(grpcSrv, maps.Keys(methodsMap), logger.With("sub-module", "grpc-reflection"))
s.grpcSrv = grpcSrv
s.config = cfg
@ -57,11 +73,88 @@ func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.L
return nil
}
func (s *GRPCServer[T]) Name() string {
func (s *Server[T]) StartCmdFlags() *pflag.FlagSet {
flags := pflag.NewFlagSet("grpc", pflag.ExitOnError)
// start flags are prefixed with the server name
// as the config in prefixed with the server name
// this allows viper to properly bind the flags
prefix := func(f string) string {
return fmt.Sprintf("%s.%s", s.Name(), f)
}
flags.String(prefix(FlagAddress), "localhost:9090", "Listen address")
return flags
}
func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface {
Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error)
},
) grpc.StreamHandler {
return func(srv any, stream grpc.ServerStream) error {
method, ok := grpc.MethodFromServerStream(stream)
if !ok {
return status.Error(codes.InvalidArgument, "unable to get method")
}
makeMsg, exists := messageMap[method]
if !exists {
return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method)
}
for {
req := makeMsg()
err := stream.RecvMsg(req)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
// extract height header
ctx := stream.Context()
height, err := getHeightFromCtx(ctx)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid get height from context: %v", err)
}
resp, err := querier.Query(ctx, height, req)
if err != nil {
return err
}
err = stream.SendMsg(resp)
if err != nil {
return err
}
}
}
}
func getHeightFromCtx(ctx context.Context) (uint64, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return 0, nil
}
values := md.Get(BlockHeightHeader)
if len(values) == 0 {
return 0, nil
}
if len(values) != 1 {
return 0, fmt.Errorf("gRPC height metadata must be of length 1, got: %d", len(values))
}
heightStr := values[0]
height, err := strconv.ParseUint(heightStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("unable to parse height string from gRPC metadata %s: %w", heightStr, err)
}
return height, nil
}
func (s *Server[T]) Name() string {
return "grpc"
}
func (s *GRPCServer[T]) Config() any {
func (s *Server[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
@ -75,7 +168,7 @@ func (s *GRPCServer[T]) Config() any {
return s.config
}
func (s *GRPCServer[T]) Start(ctx context.Context) error {
func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}
@ -102,7 +195,7 @@ func (s *GRPCServer[T]) Start(ctx context.Context) error {
return err
}
func (s *GRPCServer[T]) Stop(ctx context.Context) error {
func (s *Server[T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}

View File

@ -41,7 +41,6 @@ type Consensus[T transaction.Tx] struct {
streaming streaming.Manager
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC
cfg Config
indexedEvents map[string]struct{}
@ -60,6 +59,8 @@ type Consensus[T transaction.Tx] struct {
addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID
grpcMethodsMap map[string]func() gogoproto.Message // maps gRPC method to message creator func
}
func NewConsensus[T transaction.Tx](
@ -69,7 +70,7 @@ func NewConsensus[T transaction.Tx](
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
indexedEvents map[string]struct{},
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
gRPCMethodsMap map[string]func() gogoproto.Message,
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
@ -78,7 +79,7 @@ func NewConsensus[T transaction.Tx](
appName: appName,
version: getCometBFTServerVersion(),
consensusAuthority: consensusAuthority,
grpcQueryDecoders: grpcQueryDecoders,
grpcMethodsMap: gRPCMethodsMap,
app: app,
cfg: cfg,
store: store,
@ -173,9 +174,10 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// check if it's a gRPC method
grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path]
makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path]
if isGRPC {
protoRequest, err := grpcQueryDecoder(req.Data)
protoRequest := makeGRPCRequest()
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
if err != nil {
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}

View File

@ -79,7 +79,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l
appI.GetAppManager(),
s.serverOptions.Mempool,
indexEvents,
appI.GetGRPCQueryDecoders(),
appI.GetGPRCMethodsToMessageMap(),
appI.GetStore().(types.Store),
s.config,
s.initTxCodec,

View File

@ -35,6 +35,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/sync v0.7.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
@ -76,7 +77,6 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tidwall/btree v1.7.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect

View File

@ -16,6 +16,7 @@ import (
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
grpc "cosmossdk.io/server/v2/api/grpc"
"cosmossdk.io/server/v2/appmanager"
)
type mockInterfaceRegistry struct{}
@ -33,6 +34,14 @@ type mockApp[T transaction.Tx] struct {
serverv2.AppI[T]
}
func (*mockApp[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return map[string]func() gogoproto.Message{}
}
func (*mockApp[T]) GetAppManager() *appmanager.AppManager[T] {
return nil
}
func (*mockApp[T]) InterfaceRegistry() coreapp.InterfaceRegistry {
return &mockInterfaceRegistry{}
}

View File

@ -1,76 +0,0 @@
package store
import (
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
)
// StoreComponent manages store config
// and contains prune & snapshot commands
type StoreComponent[T transaction.Tx] struct {
config *Config
}
func New[T transaction.Tx]() *StoreComponent[T] {
return &StoreComponent[T]{}
}
func (s *StoreComponent[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error {
cfg := DefaultConfig()
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
s.config = cfg
return nil
}
func (s *StoreComponent[T]) Name() string {
return "store"
}
func (s *StoreComponent[T]) Start(ctx context.Context) error {
return nil
}
func (s *StoreComponent[T]) Stop(ctx context.Context) error {
return nil
}
func (s *StoreComponent[T]) GetCommands() []*cobra.Command {
return []*cobra.Command{
s.PrunesCmd(),
}
}
func (s *StoreComponent[T]) GetTxs() []*cobra.Command {
return nil
}
func (s *StoreComponent[T]) GetQueries() []*cobra.Command {
return nil
}
func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig {
return serverv2.CLIConfig{
Commands: []*cobra.Command{
s.PrunesCmd(),
},
}
}
func (g *StoreComponent[T]) Config() any {
if g.config == nil || g.config == (&Config{}) {
return DefaultConfig()
}
return g.config
}

View File

@ -17,6 +17,6 @@ type AppI[T transaction.Tx] interface {
InterfaceRegistry() coreapp.InterfaceRegistry
GetAppManager() *appmanager.AppManager[T]
GetConsensusAuthority() string
GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error)
GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message
GetStore() any
}