feat(server/v2): add config overwrite (#20874)

This commit is contained in:
Julien Robert 2024-07-04 13:13:29 +02:00 committed by GitHub
parent 9df6019de6
commit b63d8404e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 291 additions and 165 deletions

View File

@ -0,0 +1,95 @@
package grpc
import (
"errors"
"fmt"
gogoproto "github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
_ "cosmossdk.io/api/amino" // Import amino.proto file for reflection
appmanager "cosmossdk.io/core/app"
)
type protoCodec struct {
interfaceRegistry appmanager.InterfaceRegistry
}
// newProtoCodec returns a reference to a new ProtoCodec
func newProtoCodec(interfaceRegistry appmanager.InterfaceRegistry) *protoCodec {
return &protoCodec{
interfaceRegistry: interfaceRegistry,
}
}
// Marshal implements BinaryMarshaler.Marshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.MarshalInterface
func (pc *protoCodec) Marshal(o gogoproto.Message) ([]byte, error) {
// Size() check can catch the typed nil value.
if o == nil || gogoproto.Size(o) == 0 {
// return empty bytes instead of nil, because nil has special meaning in places like store.Set
return []byte{}, nil
}
return gogoproto.Marshal(o)
}
// Unmarshal implements BinaryMarshaler.Unmarshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.UnmarshalInterface
func (pc *protoCodec) Unmarshal(bz []byte, ptr gogoproto.Message) error {
err := gogoproto.Unmarshal(bz, ptr)
if err != nil {
return err
}
// err = codectypes.UnpackInterfaces(ptr, pc.interfaceRegistry) // TODO: identify if needed for grpc
// if err != nil {
// return err
// }
return nil
}
func (pc *protoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}
// GRPCCodec returns the gRPC Codec for this specific ProtoCodec
func (pc *protoCodec) GRPCCodec() encoding.Codec {
return &grpcProtoCodec{cdc: pc}
}
// grpcProtoCodec is the implementation of the gRPC proto codec.
type grpcProtoCodec struct {
cdc appmanager.ProtoCodec
}
var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error
func (c grpcProtoCodec) Marshal(v any) ([]byte, error) {
switch m := v.(type) {
case proto.Message:
protov2MarshalOpts := proto.MarshalOptions{Deterministic: true}
return protov2MarshalOpts.Marshal(m)
case gogoproto.Message:
return c.cdc.Marshal(m)
default:
return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v)
}
}
func (c grpcProtoCodec) Unmarshal(data []byte, v any) error {
switch m := v.(type) {
case proto.Message:
return proto.Unmarshal(data, m)
case gogoproto.Message:
return c.cdc.Unmarshal(data, m)
default:
return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v)
}
}
func (c grpcProtoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}

View File

@ -32,3 +32,20 @@ type Config struct {
// The default value is math.MaxInt32.
MaxSendMsgSize int `mapstructure:"max-send-msg-size" toml:"max-send-msg-size" comment:"MaxSendMsgSize defines the max message size in bytes the server can send.\nThe default value is math.MaxInt32."`
}
// CfgOption is a function that allows to overwrite the default server configuration.
type CfgOption func(*Config)
// OverwriteDefaultConfig overwrites the default config with the new config.
func OverwriteDefaultConfig(newCfg *Config) CfgOption {
return func(cfg *Config) {
*cfg = *newCfg
}
}
// Disable the grpc-gateway server by default (default enabled).
func Disable() CfgOption {
return func(cfg *Config) {
cfg.Enable = false
}
}

View File

@ -2,19 +2,12 @@ package grpc
import (
"context"
"errors"
"fmt"
"net"
gogogrpc "github.com/cosmos/gogoproto/grpc"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/proto"
_ "cosmossdk.io/api/amino" // Import amino.proto file for reflection
appmanager "cosmossdk.io/core/app"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
@ -22,27 +15,26 @@ import (
)
type GRPCServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
logger log.Logger
config *Config
logger log.Logger
config *Config
cfgOptions []CfgOption
grpcSrv *grpc.Server
}
type GRPCService interface {
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
RegisterGRPCServer(gogogrpc.Server)
}
func New[AppT serverv2.AppI[T], T transaction.Tx]() *GRPCServer[AppT, T] {
return &GRPCServer[AppT, T]{}
// New creates a new grpc server.
func New[AppT serverv2.AppI[T], T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[AppT, T] {
return &GRPCServer[AppT, T]{
cfgOptions: cfgOptions,
}
}
// Init returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server.
func (g *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error {
cfg := DefaultConfig()
func (s *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(g.Name()).Unmarshal(&cfg); err != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
@ -55,25 +47,42 @@ func (g *GRPCServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger)
// appI.RegisterGRPCServer(grpcSrv)
// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
// Reflection allows external clients to see what services and methods the gRPC server exposes.
gogoreflection.Register(grpcSrv)
g.grpcSrv = grpcSrv
g.config = cfg
g.logger = logger.With(log.ModuleKey, g.Name())
s.grpcSrv = grpcSrv
s.config = cfg
s.logger = logger.With(log.ModuleKey, s.Name())
return nil
}
func (g *GRPCServer[AppT, T]) Name() string {
func (s *GRPCServer[AppT, T]) Name() string {
return "grpc"
}
func (g *GRPCServer[AppT, T]) Start(ctx context.Context) error {
listener, err := net.Listen("tcp", g.config.Address)
func (s *GRPCServer[AppT, T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
opt(cfg)
}
return cfg
}
return s.config
}
func (s *GRPCServer[AppT, T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}
listener, err := net.Listen("tcp", s.config.Address)
if err != nil {
return fmt.Errorf("failed to listen on address %s: %w", g.config.Address, err)
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
}
errCh := make(chan error)
@ -82,110 +91,24 @@ func (g *GRPCServer[AppT, T]) Start(ctx context.Context) error {
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
g.logger.Info("starting gRPC server...", "address", g.config.Address)
errCh <- g.grpcSrv.Serve(listener)
s.logger.Info("starting gRPC server...", "address", s.config.Address)
errCh <- s.grpcSrv.Serve(listener)
}()
// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
err = <-errCh
g.logger.Error("failed to start gRPC server", "err", err)
s.logger.Error("failed to start gRPC server", "err", err)
return err
}
func (g *GRPCServer[AppT, T]) Stop(ctx context.Context) error {
g.logger.Info("stopping gRPC server...", "address", g.config.Address)
g.grpcSrv.GracefulStop()
func (s *GRPCServer[AppT, T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}
s.logger.Info("stopping gRPC server...", "address", s.config.Address)
s.grpcSrv.GracefulStop()
return nil
}
func (g *GRPCServer[AppT, T]) Config() any {
if g.config == nil || g.config == (&Config{}) {
return DefaultConfig()
}
return g.config
}
type protoCodec struct {
interfaceRegistry appmanager.InterfaceRegistry
}
// newProtoCodec returns a reference to a new ProtoCodec
func newProtoCodec(interfaceRegistry appmanager.InterfaceRegistry) *protoCodec {
return &protoCodec{
interfaceRegistry: interfaceRegistry,
}
}
// Marshal implements BinaryMarshaler.Marshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.MarshalInterface
func (pc *protoCodec) Marshal(o gogoproto.Message) ([]byte, error) {
// Size() check can catch the typed nil value.
if o == nil || gogoproto.Size(o) == 0 {
// return empty bytes instead of nil, because nil has special meaning in places like store.Set
return []byte{}, nil
}
return gogoproto.Marshal(o)
}
// Unmarshal implements BinaryMarshaler.Unmarshal method.
// NOTE: this function must be used with a concrete type which
// implements proto.Message. For interface please use the codec.UnmarshalInterface
func (pc *protoCodec) Unmarshal(bz []byte, ptr gogoproto.Message) error {
err := gogoproto.Unmarshal(bz, ptr)
if err != nil {
return err
}
// err = codectypes.UnpackInterfaces(ptr, pc.interfaceRegistry) // TODO: identify if needed for grpc
// if err != nil {
// return err
// }
return nil
}
func (pc *protoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}
// GRPCCodec returns the gRPC Codec for this specific ProtoCodec
func (pc *protoCodec) GRPCCodec() encoding.Codec {
return &grpcProtoCodec{cdc: pc}
}
// grpcProtoCodec is the implementation of the gRPC proto codec.
type grpcProtoCodec struct {
cdc appmanager.ProtoCodec
}
var errUnknownProtoType = errors.New("codec: unknown proto type") // sentinel error
func (g grpcProtoCodec) Marshal(v any) ([]byte, error) {
switch m := v.(type) {
case proto.Message:
protov2MarshalOpts := proto.MarshalOptions{Deterministic: true}
return protov2MarshalOpts.Marshal(m)
case gogoproto.Message:
return g.cdc.Marshal(m)
default:
return nil, fmt.Errorf("%w: cannot marshal type %T", errUnknownProtoType, v)
}
}
func (g grpcProtoCodec) Unmarshal(data []byte, v any) error {
switch m := v.(type) {
case proto.Message:
return proto.Unmarshal(data, m)
case gogoproto.Message:
return g.cdc.Unmarshal(data, m)
default:
return fmt.Errorf("%w: cannot unmarshal type %T", errUnknownProtoType, v)
}
}
func (g grpcProtoCodec) Name() string {
return "cosmos-sdk-grpc-codec"
}

View File

@ -1,12 +1,28 @@
package grpcgateway
func DefaultConfig() *Config {
return &Config{
Enable: true,
}
}
type Config struct {
// Enable defines if the gRPC-gateway should be enabled.
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."`
}
func DefaultConfig() Config {
return Config{
Enable: true,
type CfgOption func(*Config)
// OverwriteDefaultConfig overwrites the default config with the new config.
func OverwriteDefaultConfig(newCfg *Config) CfgOption {
return func(cfg *Config) {
*cfg = *newCfg
}
}
// Disable the grpc server by default (default enabled).
func Disable() CfgOption {
return func(cfg *Config) {
cfg.Enable = false
}
}

View File

@ -1,6 +1,8 @@
package grpcgateway
import (
"context"
"fmt"
"net/http"
"strings"
@ -8,25 +10,34 @@ import (
"github.com/cosmos/gogoproto/jsonpb"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/spf13/viper"
"google.golang.org/grpc"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
)
var _ serverv2.ServerComponent[
serverv2.AppI[transaction.Tx], transaction.Tx,
] = (*GRPCGatewayServer[serverv2.AppI[transaction.Tx], transaction.Tx])(nil)
const (
// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)
type Server struct {
logger log.Logger
type GRPCGatewayServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
logger log.Logger
config *Config
cfgOptions []CfgOption
GRPCSrv *grpc.Server
GRPCGatewayRouter *runtime.ServeMux
config Config
}
// New creates a new gRPC-gateway server.
func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResolver) *Server {
func New[AppT serverv2.AppI[T], T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[AppT, T] {
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
marshalerOption := &gateway.JSONPb{
@ -35,8 +46,9 @@ func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResol
OrigName: true,
AnyResolver: ir,
}
return &Server{
logger: logger,
return &GRPCGatewayServer[AppT, T]{
GRPCSrv: grpcSrv,
GRPCGatewayRouter: runtime.NewServeMux(
// Custom marshaler option is required for gogo proto
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),
@ -49,10 +61,74 @@ func New(logger log.Logger, grpcSrv *grpc.Server, cfg Config, ir jsonpb.AnyResol
// GRPC metadata
runtime.WithIncomingHeaderMatcher(CustomGRPCHeaderMatcher),
),
config: cfg,
cfgOptions: cfgOptions,
}
}
func (g *GRPCGatewayServer[AppT, T]) Name() string {
return "grpc-gateway"
}
func (s *GRPCGatewayServer[AppT, T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
opt(cfg)
}
return cfg
}
return s.config
}
func (s *GRPCGatewayServer[AppT, T]) Init(appI AppT, v *viper.Viper, logger log.Logger) error {
cfg := s.Config().(*Config)
if v != nil {
if err := v.Sub(s.Name()).Unmarshal(&cfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
// Register the gRPC-Gateway server.
// appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv)
s.logger = logger
s.config = cfg
return nil
}
func (s *GRPCGatewayServer[AppT, T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
}
// TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98)
return nil
}
func (s *GRPCGatewayServer[AppT, T]) Stop(ctx context.Context) error {
if !s.config.Enable {
return nil
}
return nil
}
// Register implements registers a grpc-gateway server
func (s *GRPCGatewayServer[AppT, T]) Register(r mux.Router) error {
// configure grpc-gatway server
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
s.GRPCGatewayRouter.ServeHTTP(w, req)
}))
return nil
}
// CustomGRPCHeaderMatcher for mapping request headers to
// GRPC metadata.
// HTTP headers that start with 'Grpc-Metadata-' are automatically mapped to
@ -67,16 +143,3 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
return runtime.DefaultHeaderMatcher(key)
}
}
// Register implements registers a grpc-gateway server
func (s *Server) Register(r mux.Router) error {
// configure grpc-gatway server
if s.config.Enable {
r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Fall back to grpc gateway server.
s.GRPCGatewayRouter.ServeHTTP(w, req)
}))
}
return nil
}

View File

@ -5,7 +5,6 @@ import (
"github.com/spf13/viper"
serverv2 "cosmossdk.io/server/v2"
"cosmossdk.io/server/v2/api/grpc"
"cosmossdk.io/server/v2/cometbft/types"
)
@ -41,12 +40,19 @@ type Config struct {
Addr string `mapstructure:"addr" toml:"addr"`
Standalone bool `mapstructure:"standalone" toml:"standalone"`
Trace bool `mapstructure:"trace" toml:"trace"`
GrpcConfig grpc.Config
// MempoolConfig
CmtConfig *cmtcfg.Config
// Must be set by the application to grant authority to the consensus engine to send messages to the consensus module
ConsensusAuthority string
// config.toml
CmtConfig *cmtcfg.Config
}
// CmtCfgOption is a function that allows to overwrite the default server configuration.
type CmtCfgOption func(*cmtcfg.Config)
// OverwriteDefaultConfig overwrites the default config with the new config.
func OverwriteDefaultCometConfig(newCfg *cmtcfg.Config) CmtCfgOption {
return func(cfg *cmtcfg.Config) { // nolint:staticcheck // We want to overwrite everything
cfg = newCfg // nolint:ineffassign,staticcheck // We want to overwrite everything
}
}

View File

@ -41,16 +41,18 @@ type CometBFTServer[AppT serverv2.AppI[T], T transaction.Tx] struct {
Node *node.Node
Consensus *Consensus[T]
initTxCodec transaction.Codec[T]
logger log.Logger
config Config
options ServerOptions[T]
initTxCodec transaction.Codec[T]
logger log.Logger
config Config
options ServerOptions[T]
cmtConfigOptions []CmtCfgOption
}
func New[AppT serverv2.AppI[T], T transaction.Tx](txCodec transaction.Codec[T], options ServerOptions[T]) *CometBFTServer[AppT, T] {
func New[AppT serverv2.AppI[T], T transaction.Tx](txCodec transaction.Codec[T], options ServerOptions[T], cfgOptions ...CmtCfgOption) *CometBFTServer[AppT, T] {
return &CometBFTServer[AppT, T]{
initTxCodec: txCodec,
options: options,
initTxCodec: txCodec,
options: options,
cmtConfigOptions: cfgOptions,
}
}
@ -208,6 +210,10 @@ func (s *CometBFTServer[AppT, T]) CLICommands() serverv2.CLIConfig {
func (s *CometBFTServer[AppT, T]) WriteDefaultConfigAt(configPath string) error {
cometConfig := cmtcfg.DefaultConfig()
for _, opt := range s.cmtConfigOptions {
opt(cometConfig)
}
cmtcfg.WriteConfigFile(filepath.Join(configPath, "config.toml"), cometConfig)
return nil
}

View File

@ -33,6 +33,10 @@ func (s *mockServer) Name() string {
return s.name
}
func (s *mockServer) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error {
return nil
}
func (s *mockServer) Start(ctx context.Context) error {
for ctx.Err() == nil {
s.ch <- fmt.Sprintf("%s mock server: %d", s.name, rand.Int())
@ -54,7 +58,3 @@ func (s *mockServer) Stop(ctx context.Context) error {
func (s *mockServer) Config() any {
return MockServerDefaultConfig()
}
func (s *mockServer) Init(appI serverv2.AppI[transaction.Tx], v *viper.Viper, logger log.Logger) error {
return nil
}