diff --git a/server/v2/api/grpc/codec.go b/server/v2/api/grpc/codec.go new file mode 100644 index 0000000000..d0d885c041 --- /dev/null +++ b/server/v2/api/grpc/codec.go @@ -0,0 +1,95 @@ +package grpc + +import ( + "errors" + "fmt" + + gogoproto "github.com/cosmos/gogoproto/proto" + "google.golang.org/grpc/encoding" + "google.golang.org/protobuf/proto" + + _ "cosmossdk.io/api/amino" // Import amino.proto file for reflection + appmanager "cosmossdk.io/core/app" +) + +type protoCodec struct { + interfaceRegistry appmanager.InterfaceRegistry +} + +// newProtoCodec returns a reference to a new ProtoCodec +func newProtoCodec(interfaceRegistry appmanager.InterfaceRegistry) *protoCodec { + return &protoCodec{ + interfaceRegistry: interfaceRegistry, + } +} + +// Marshal implements BinaryMarshaler.Marshal method. +// NOTE: this function must be used with a concrete type which +// implements proto.Message. For interface please use the codec.MarshalInterface +func (pc *protoCodec) Marshal(o gogoproto.Message) ([]byte, error) { + // Size() check can catch the typed nil value. + if o == nil || gogoproto.Size(o) == 0 { + // return empty bytes instead of nil, because nil has special meaning in places like store.Set + return []byte{}, nil + } + + return gogoproto.Marshal(o) +} + +// Unmarshal implements BinaryMarshaler.Unmarshal method. +// NOTE: this function must be used with a concrete type which +// implements proto.Message. For interface please use the codec.UnmarshalInterface +func (pc *protoCodec) Unmarshal(bz []byte, ptr gogoproto.Message) error { + err := gogoproto.Unmarshal(bz, ptr) + if err != nil { + return err + } + // err = codectypes.UnpackInterfaces(ptr, pc.interfaceRegistry) // TODO: identify if needed for grpc + // if err != nil { + // return err + // } + return nil +} + +func (pc *protoCodec) Name() string { + return "cosmos-sdk-grpc-codec" +} + +// GRPCCodec returns the gRPC Codec for this specific ProtoCodec +func (pc *protoCodec) GRPCCodec() encoding.Codec { + return &grpcProtoCodec{cdc: pc} +} + +// grpcProtoCodec is the implementation of the gRPC proto codec. +type grpcProtoCodec struct { + cdc appmanager.ProtoCodec +} + +var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error + +func (c grpcProtoCodec) Marshal(v any) ([]byte, error) { + switch m := v.(type) { + case proto.Message: + protov2MarshalOpts := proto.MarshalOptions{Deterministic: true} + return protov2MarshalOpts.Marshal(m) + case gogoproto.Message: + return c.cdc.Marshal(m) + default: + return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v) + } +} + +func (c grpcProtoCodec) Unmarshal(data []byte, v any) error { + switch m := v.(type) { + case proto.Message: + return proto.Unmarshal(data, m) + case gogoproto.Message: + return c.cdc.Unmarshal(data, m) + default: + return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v) + } +} + +func (c grpcProtoCodec) Name() string { + return "cosmos-sdk-grpc-codec" +} diff --git a/server/v2/api/grpc/config.go b/server/v2/api/grpc/config.go index 1f117bd3a8..5cb0f24ed4 100644 --- a/server/v2/api/grpc/config.go +++ b/server/v2/api/grpc/config.go @@ -32,3 +32,20 @@ type Config struct { // The default value is math.MaxInt32. MaxSendMsgSize int `mapstructure:"max-send-msg-size" toml:"max-send-msg-size" comment:"MaxSendMsgSize defines the max message size in bytes the server can send.\nThe default value is math.MaxInt32."` } + +// CfgOption is a function that allows to overwrite the default server configuration. +type CfgOption func(*Config) + +// OverwriteDefaultConfig overwrites the default config with the new config. +func OverwriteDefaultConfig(newCfg *Config) CfgOption { + return func(cfg *Config) { + *cfg = *newCfg + } +} + +// Disable the grpc-gateway server by default (default enabled). +func Disable() CfgOption { + return func(cfg *Config) { + cfg.Enable = false + } +} diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index b569911e8b..20397ea7a8 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -2,19 +2,12 @@ package grpc import ( "context" - "errors" "fmt" "net" - gogogrpc "github.com/cosmos/gogoproto/grpc" - gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/viper" "google.golang.org/grpc" - "google.golang.org/grpc/encoding" - "google.golang.org/protobuf/proto" - _ "cosmossdk.io/api/amino" // Import amino.proto file for reflection - appmanager "cosmossdk.io/core/app" "cosmossdk.io/core/transaction" "cosmossdk.io/log" serverv2 "cosmossdk.io/server/v2" @@ -22,27 +15,26 @@ import ( ) type GRPCServer[AppT serverv2.AppI[T], T transaction.Tx] struct { - logger log.Logger - config *Config + logger log.Logger + config *Config + cfgOptions []CfgOption grpcSrv *grpc.Server } -type GRPCService interface { - // RegisterGRPCServer registers gRPC services directly with the gRPC server. - RegisterGRPCServer(gogogrpc.Server) -} - -func New[AppT serverv2.AppI[T], T transaction.Tx]() *GRPCServer[AppT, T] { - return &GRPCServer[AppT, T]{} +// New creates a new grpc server. +func New[AppT serverv2.AppI[T], T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[AppT, T] { + return &GRPCServer[AppT, T]{ + cfgOptions: cfgOptions, + } } // Init returns a correctly configured and initialized gRPC server. // Note, the caller is responsible for starting the server. -func (g *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error { - cfg := DefaultConfig() +func (s *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error { + cfg := s.Config().(*Config) if v != nil { - if err := v.Sub(g.Name()).Unmarshal(&cfg); err != nil { + if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } } @@ -55,25 +47,42 @@ func (g *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) // appI.RegisterGRPCServer(grpcSrv) - // Reflection allows external clients to see what services and methods - // the gRPC server exposes. + // Reflection allows external clients to see what services and methods the gRPC server exposes. gogoreflection.Register(grpcSrv) - g.grpcSrv = grpcSrv - g.config = cfg - g.logger = logger.With(log.ModuleKey, g.Name()) + s.grpcSrv = grpcSrv + s.config = cfg + s.logger = logger.With(log.ModuleKey, s.Name()) return nil } -func (g *GRPCServer[AppT, T]) Name() string { +func (s *GRPCServer[AppT, T]) Name() string { return "grpc" } -func (g *GRPCServer[AppT, T]) Start(ctx context.Context) error { - listener, err := net.Listen("tcp", g.config.Address) +func (s *GRPCServer[AppT, T]) Config() any { + if s.config == nil || s.config == (&Config{}) { + cfg := DefaultConfig() + // overwrite the default config with the provided options + for _, opt := range s.cfgOptions { + opt(cfg) + } + + return cfg + } + + return s.config +} + +func (s *GRPCServer[AppT, T]) Start(ctx context.Context) error { + if !s.config.Enable { + return nil + } + + listener, err := net.Listen("tcp", s.config.Address) if err != nil { - return fmt.Errorf("failed to listen on address %s: %w", g.config.Address, err) + return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err) } errCh := make(chan error) @@ -82,110 +91,24 @@ func (g *GRPCServer[AppT, T]) Start(ctx context.Context) error { // an error upon failure, which we'll send on the error channel that will be // consumed by the for block below. go func() { - g.logger.Info("starting gRPC server...", "address", g.config.Address) - errCh <- g.grpcSrv.Serve(listener) + s.logger.Info("starting gRPC server...", "address", s.config.Address) + errCh <- s.grpcSrv.Serve(listener) }() // Start a blocking select to wait for an indication to stop the server or that // the server failed to start properly. err = <-errCh - g.logger.Error("failed to start gRPC server", "err", err) + s.logger.Error("failed to start gRPC server", "err", err) return err } -func (g *GRPCServer[AppT, T]) Stop(ctx context.Context) error { - g.logger.Info("stopping gRPC server...", "address", g.config.Address) - g.grpcSrv.GracefulStop() +func (s *GRPCServer[AppT, T]) Stop(ctx context.Context) error { + if !s.config.Enable { + return nil + } + + s.logger.Info("stopping gRPC server...", "address", s.config.Address) + s.grpcSrv.GracefulStop() return nil } - -func (g *GRPCServer[AppT, T]) Config() any { - if g.config == nil || g.config == (&Config{}) { - return DefaultConfig() - } - - return g.config -} - -type protoCodec struct { - interfaceRegistry appmanager.InterfaceRegistry -} - -// newProtoCodec returns a reference to a new ProtoCodec -func newProtoCodec(interfaceRegistry appmanager.InterfaceRegistry) *protoCodec { - return &protoCodec{ - interfaceRegistry: interfaceRegistry, - } -} - -// Marshal implements BinaryMarshaler.Marshal method. -// NOTE: this function must be used with a concrete type which -// implements proto.Message. For interface please use the codec.MarshalInterface -func (pc *protoCodec) Marshal(o gogoproto.Message) ([]byte, error) { - // Size() check can catch the typed nil value. - if o == nil || gogoproto.Size(o) == 0 { - // return empty bytes instead of nil, because nil has special meaning in places like store.Set - return []byte{}, nil - } - - return gogoproto.Marshal(o) -} - -// Unmarshal implements BinaryMarshaler.Unmarshal method. -// NOTE: this function must be used with a concrete type which -// implements proto.Message. For interface please use the codec.UnmarshalInterface -func (pc *protoCodec) Unmarshal(bz []byte, ptr gogoproto.Message) error { - err := gogoproto.Unmarshal(bz, ptr) - if err != nil { - return err - } - // err = codectypes.UnpackInterfaces(ptr, pc.interfaceRegistry) // TODO: identify if needed for grpc - // if err != nil { - // return err - // } - return nil -} - -func (pc *protoCodec) Name() string { - return "cosmos-sdk-grpc-codec" -} - -// GRPCCodec returns the gRPC Codec for this specific ProtoCodec -func (pc *protoCodec) GRPCCodec() encoding.Codec { - return &grpcProtoCodec{cdc: pc} -} - -// grpcProtoCodec is the implementation of the gRPC proto codec. -type grpcProtoCodec struct { - cdc appmanager.ProtoCodec -} - -var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error - -func (g grpcProtoCodec) Marshal(v any) ([]byte, error) { - switch m := v.(type) { - case proto.Message: - protov2MarshalOpts := proto.MarshalOptions{Deterministic: true} - return protov2MarshalOpts.Marshal(m) - case gogoproto.Message: - return g.cdc.Marshal(m) - default: - return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v) - } -} - -func (g grpcProtoCodec) Unmarshal(data []byte, v any) error { - switch m := v.(type) { - case proto.Message: - return proto.Unmarshal(data, m) - case gogoproto.Message: - return g.cdc.Unmarshal(data, m) - default: - return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v) - } -} - -func (g grpcProtoCodec) Name() string { - return "cosmos-sdk-grpc-codec" -} diff --git a/server/v2/api/grpcgateway/config.go b/server/v2/api/grpcgateway/config.go index 2436f14c20..c5ccb3bfe2 100644 --- a/server/v2/api/grpcgateway/config.go +++ b/server/v2/api/grpcgateway/config.go @@ -1,12 +1,28 @@ package grpcgateway +func DefaultConfig() *Config { + return &Config{ + Enable: true, + } +} + type Config struct { // Enable defines if the gRPC-gateway should be enabled. Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."` } -func DefaultConfig() Config { - return Config{ - Enable: true, +type CfgOption func(*Config) + +// OverwriteDefaultConfig overwrites the default config with the new config. +func OverwriteDefaultConfig(newCfg *Config) CfgOption { + return func(cfg *Config) { + *cfg = *newCfg + } +} + +// Disable the grpc server by default (default enabled). +func Disable() CfgOption { + return func(cfg *Config) { + cfg.Enable = false } } diff --git a/server/v2/api/grpcgateway/server.go b/server/v2/api/grpcgateway/server.go index ae20e49fb2..220e8b32f6 100644 --- a/server/v2/api/grpcgateway/server.go +++ b/server/v2/api/grpcgateway/server.go @@ -1,6 +1,8 @@ package grpcgateway import ( + "context" + "fmt" "net/http" "strings" @@ -8,25 +10,34 @@ import ( "github.com/cosmos/gogoproto/jsonpb" "github.com/gorilla/mux" "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/spf13/viper" "google.golang.org/grpc" + "cosmossdk.io/core/transaction" "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" ) +var _ serverv2.ServerComponent[ + serverv2.AppI[transaction.Tx], transaction.Tx, +] = (*GRPCGatewayServer[serverv2.AppI[transaction.Tx], transaction.Tx])(nil) + const ( // GRPCBlockHeightHeader is the gRPC header for block height. GRPCBlockHeightHeader = "x-cosmos-block-height" ) -type Server struct { - logger log.Logger +type GRPCGatewayServer[AppT serverv2.AppI[T], T transaction.Tx] struct { + logger log.Logger + config *Config + cfgOptions []CfgOption + GRPCSrv *grpc.Server GRPCGatewayRouter *runtime.ServeMux - config Config } // New creates a new gRPC-gateway server. -func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResolver) *Server { +func New[AppT serverv2.AppI[T], T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[AppT, T] { // The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields. // Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue. marshalerOption := &gateway.JSONPb{ @@ -35,8 +46,9 @@ func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResol OrigName: true, AnyResolver: ir, } - return &Server{ - logger: logger, + + return &GRPCGatewayServer[AppT, T]{ + GRPCSrv: grpcSrv, GRPCGatewayRouter: runtime.NewServeMux( // Custom marshaler option is required for gogo proto runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption), @@ -49,10 +61,74 @@ func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResol // GRPC metadata runtime.WithIncomingHeaderMatcher(CustomGRPCHeaderMatcher), ), - config: cfg, + cfgOptions: cfgOptions, } } +func (g *GRPCGatewayServer[AppT, T]) Name() string { + return "grpc-gateway" +} + +func (s *GRPCGatewayServer[AppT, T]) Config() any { + if s.config == nil || s.config == (&Config{}) { + cfg := DefaultConfig() + // overwrite the default config with the provided options + for _, opt := range s.cfgOptions { + opt(cfg) + } + + return cfg + } + + return s.config +} + +func (s *GRPCGatewayServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error { + cfg := s.Config().(*Config) + if v != nil { + if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil { + return fmt.Errorf("failed to unmarshal config: %w", err) + } + } + + // Register the gRPC-Gateway server. + // appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv) + + s.logger = logger + s.config = cfg + + return nil +} + +func (s *GRPCGatewayServer[AppT, T]) Start(ctx context.Context) error { + if !s.config.Enable { + return nil + } + + // TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98) + + return nil +} + +func (s *GRPCGatewayServer[AppT, T]) Stop(ctx context.Context) error { + if !s.config.Enable { + return nil + } + + return nil +} + +// Register implements registers a grpc-gateway server +func (s *GRPCGatewayServer[AppT, T]) Register(r mux.Router) error { + // configure grpc-gatway server + r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + // Fall back to grpc gateway server. + s.GRPCGatewayRouter.ServeHTTP(w, req) + })) + + return nil +} + // CustomGRPCHeaderMatcher for mapping request headers to // GRPC metadata. // HTTP headers that start with 'Grpc-Metadata-' are automatically mapped to @@ -67,16 +143,3 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) { return runtime.DefaultHeaderMatcher(key) } } - -// Register implements registers a grpc-gateway server -func (s *Server) Register(r mux.Router) error { - // configure grpc-gatway server - if s.config.Enable { - r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - // Fall back to grpc gateway server. - s.GRPCGatewayRouter.ServeHTTP(w, req) - })) - } - - return nil -} diff --git a/server/v2/cometbft/config.go b/server/v2/cometbft/config.go index f5b2359e8b..d35839eaaa 100644 --- a/server/v2/cometbft/config.go +++ b/server/v2/cometbft/config.go @@ -5,7 +5,6 @@ import ( "github.com/spf13/viper" serverv2 "cosmossdk.io/server/v2" - "cosmossdk.io/server/v2/api/grpc" "cosmossdk.io/server/v2/cometbft/types" ) @@ -41,12 +40,19 @@ type Config struct { Addr string `mapstructure:"addr" toml:"addr"` Standalone bool `mapstructure:"standalone" toml:"standalone"` Trace bool `mapstructure:"trace" toml:"trace"` - - GrpcConfig grpc.Config - - // MempoolConfig - CmtConfig *cmtcfg.Config - // Must be set by the application to grant authority to the consensus engine to send messages to the consensus module ConsensusAuthority string + + // config.toml + CmtConfig *cmtcfg.Config +} + +// CmtCfgOption is a function that allows to overwrite the default server configuration. +type CmtCfgOption func(*cmtcfg.Config) + +// OverwriteDefaultConfig overwrites the default config with the new config. +func OverwriteDefaultCometConfig(newCfg *cmtcfg.Config) CmtCfgOption { + return func(cfg *cmtcfg.Config) { // nolint:staticcheck // We want to overwrite everything + cfg = newCfg // nolint:ineffassign,staticcheck // We want to overwrite everything + } } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 4088a4bd84..1b1f0ff663 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -41,16 +41,18 @@ type CometBFTServer[AppT serverv2.AppI[T], T transaction.Tx] struct { Node *node.Node Consensus *Consensus[T] - initTxCodec transaction.Codec[T] - logger log.Logger - config Config - options ServerOptions[T] + initTxCodec transaction.Codec[T] + logger log.Logger + config Config + options ServerOptions[T] + cmtConfigOptions []CmtCfgOption } -func New[AppT serverv2.AppI[T], T transaction.Tx](txCodec transaction.Codec[T], options ServerOptions[T]) *CometBFTServer[AppT, T] { +func New[AppT serverv2.AppI[T], T transaction.Tx](txCodec transaction.Codec[T], options ServerOptions[T], cfgOptions ...CmtCfgOption) *CometBFTServer[AppT, T] { return &CometBFTServer[AppT, T]{ - initTxCodec: txCodec, - options: options, + initTxCodec: txCodec, + options: options, + cmtConfigOptions: cfgOptions, } } @@ -208,6 +210,10 @@ func (s *CometBFTServer[AppT, T]) CLICommands() serverv2.CLIConfig { func (s *CometBFTServer[AppT, T]) WriteDefaultConfigAt(configPath string) error { cometConfig := cmtcfg.DefaultConfig() + for _, opt := range s.cmtConfigOptions { + opt(cometConfig) + } + cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cometConfig) return nil } diff --git a/server/v2/server_mock_test.go b/server/v2/server_mock_test.go index 10641a153e..ae238d66a5 100644 --- a/server/v2/server_mock_test.go +++ b/server/v2/server_mock_test.go @@ -33,6 +33,10 @@ func (s *mockServer) Name() string { return s.name } +func (s *mockServer) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error { + return nil +} + func (s *mockServer) Start(ctx context.Context) error { for ctx.Err() == nil { s.ch <- fmt.Sprintf("%s mock server: %d", s.name, rand.Int()) @@ -54,7 +58,3 @@ func (s *mockServer) Stop(ctx context.Context) error { func (s *mockServer) Config() any { return MockServerDefaultConfig() } - -func (s *mockServer) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error { - return nil -}